Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from StrongMonkey/acorn-v2.6.2
Browse files Browse the repository at this point in the history
Support allocating shared LB for services
  • Loading branch information
StrongMonkey authored Nov 3, 2023
2 parents 5e07dce + 6c74e5f commit 5af52aa
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 44 deletions.
141 changes: 131 additions & 10 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers"
Expand All @@ -33,6 +36,8 @@ const (
serviceTagPrefix = "service.k8s.aws"
serviceAnnotationPrefix = "service.beta.kubernetes.io"
controllerName = "service"

nlbListenerLimit = 50
)

func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder,
Expand All @@ -48,7 +53,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags,
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils,
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules)
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger)
return &serviceReconciler{
Expand All @@ -65,6 +70,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
stackDeployer: stackDeployer,
logger: logger,

allocatedServices: map[string]map[int32]struct{}{},
lock: sync.Mutex{},

maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles,
}
}
Expand All @@ -83,6 +91,9 @@ type serviceReconciler struct {
stackDeployer deploy.StackDeployer
logger logr.Logger

allocatedServices map[string]map[int32]struct{}
initialized bool
lock sync.Mutex
maxConcurrentReconciles int
}

Expand All @@ -99,16 +110,101 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err
if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil {
return client.IgnoreNotFound(err)
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
// AllocateService has to be locked to guarantee thread-safe since it read/writes map concurrently
r.lock.Lock()
if err := r.allocatedService(ctx, svc); err != nil {
r.lock.Unlock()
return err
}
r.lock.Unlock()
}

stack, lb, backendSGRequired, err := r.buildModel(ctx, svc)
if err != nil {
return err
}
if lb == nil {
return r.cleanupLoadBalancerResources(ctx, svc, stack)
if lb == nil || !svc.DeletionTimestamp.IsZero() {
return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil)
}
return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired)
}

// AllocateService makes sure that each service is allocated to a virtual stack, and a stack will not have more than 50 service/listener(the limit of listener on NLB).
// It maintains an in-memory cache to be able to track the usage. If no stack is available, it will create a new stack.
func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error {
if !r.initialized {
var serviceList corev1.ServiceList
if err := r.k8sClient.List(ctx, &serviceList); err != nil {
return err
}
for _, svc := range serviceList.Items {
if svc.Annotations[service.LoadBalancerStackKey] != "" {
if r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] == nil {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{}
}
}
}
r.initialized = true
}

if !svc.DeletionTimestamp.IsZero() {
if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok {
return nil
}

for _, port := range svc.Spec.Ports {
delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort)
}

if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 {
delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey])
}

return nil
}

// If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" {
return nil
}

allocated := false
for stackName := range r.allocatedServices {
usedPort := r.allocatedServices[stackName]
if len(usedPort) <= nlbListenerLimit-len(svc.Spec.Ports) {
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
for _, port := range svc.Spec.Ports {
usedPort[port.NodePort] = struct{}{}
}
r.allocatedServices[stackName] = usedPort
allocated = true
break
}
}

if !allocated {
stackName := uuid.New().String()
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
if r.allocatedServices[stackName] == nil {
r.allocatedServices[stackName] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[stackName][port.NodePort] = struct{}{}
}
}
return nil
}

func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) {
stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc)
if err != nil {
Expand Down Expand Up @@ -140,10 +236,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return err
}
// always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops,
// make sure only one goroutine is building the model at a time to guarantee thread safety.
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
stack.Unlock()
lbDNS, err := lb.DNSName().Resolve(ctx)
if err != nil {
return err
Expand All @@ -163,15 +264,28 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
return nil
}

func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error {
if k8s.HasFinalizer(svc, serviceFinalizer) {
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil {
return err
stack.Unlock()
if cleanlb {
nsName := k8s.NamespacedName(svc)
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
nsName = types.NamespacedName{
Namespace: "stack",
Name: svc.Annotations[service.LoadBalancerStackKey],
}
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

if err = r.cleanupServiceStatus(ctx, svc); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
Expand All @@ -189,11 +303,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin
svc.Status.LoadBalancer.Ingress[0].IP != "" ||
svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS {
svcOld := svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{
{
Hostname: lbDNS,
},
ingress := corev1.LoadBalancerIngress{
Hostname: lbDNS,
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
for _, port := range svc.Spec.Ports {
ingress.Ports = append(ingress.Ports, corev1.PortStatus{
Port: port.NodePort,
})
}
}
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress}
if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil {
return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc))
}
Expand Down Expand Up @@ -221,6 +341,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
if err := r.setupWatches(ctx, c); err != nil {
return err
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/deploy/ec2/security_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ec2

import (
"context"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -12,7 +14,6 @@ import (
ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"time"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/ec2/security_group_synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ec2

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down
20 changes: 20 additions & 0 deletions pkg/model/core/graph/resource_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ResourceGraph interface {
// Add a node into ResourceGraph.
AddNode(node ResourceUID)

RemoveNode(node ResourceUID)

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
AddEdge(srcNode ResourceUID, dstNode ResourceUID)

Expand Down Expand Up @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) {
g.nodes = append(g.nodes, node)
}

func (g *defaultResourceGraph) RemoveNode(node ResourceUID) {
for i, n := range g.nodes {
if n == node {
g.nodes = append(g.nodes[:i], g.nodes[i+1:]...)
break
}
}
delete(g.outEdges, node)
for _, nodes := range g.outEdges {
for i, n := range nodes {
if n == node {
nodes = append(nodes[:i], nodes[i+1:]...)
break
}
}
}
}

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) {
g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode)
Expand Down
54 changes: 53 additions & 1 deletion pkg/model/core/stack.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package core

import (
"github.com/pkg/errors"
"reflect"
"sync"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph"
)

Expand All @@ -14,6 +17,19 @@ type Stack interface {
// Add a resource into stack.
AddResource(res Resource) error

// Remove a resource
RemoveResource(id graph.ResourceUID)

AddService(service *corev1.Service)

RemoveService(service *corev1.Service)

ListServices() []corev1.Service

Lock()

Unlock()

// Add a dependency relationship between resources.
AddDependency(dependee Resource, depender Resource) error

Expand All @@ -30,8 +46,10 @@ func NewDefaultStack(stackID StackID) *defaultStack {
return &defaultStack{
stackID: stackID,

services: make(map[string]corev1.Service),
resources: make(map[graph.ResourceUID]Resource),
resourceGraph: graph.NewDefaultResourceGraph(),
lock: sync.Mutex{},
}
}

Expand All @@ -41,8 +59,11 @@ var _ Stack = &defaultStack{}
type defaultStack struct {
stackID StackID

services map[string]corev1.Service
resources map[graph.ResourceUID]Resource
resourceGraph graph.ResourceGraph

lock sync.Mutex
}

func (s *defaultStack) StackID() StackID {
Expand All @@ -60,6 +81,26 @@ func (s *defaultStack) AddResource(res Resource) error {
return nil
}

func (s *defaultStack) RemoveResource(id graph.ResourceUID) {
delete(s.resources, id)
s.resourceGraph.RemoveNode(id)
}

func (s *defaultStack) AddService(service *corev1.Service) {
s.services[string(service.UID)] = *service
}

func (s *defaultStack) RemoveService(service *corev1.Service) {
delete(s.services, string(service.UID))
}

func (s *defaultStack) ListServices() (result []corev1.Service) {
for _, service := range s.services {
result = append(result, service)
}
return
}

// Add a dependency relationship between resources.
func (s *defaultStack) AddDependency(dependee Resource, depender Resource) error {
dependeeResUID := s.computeResourceUID(dependee)
Expand Down Expand Up @@ -101,6 +142,9 @@ func (s *defaultStack) ListResources(pResourceSlice interface{}) error {

func (s *defaultStack) TopologicalTraversal(visitor ResourceVisitor) error {
return graph.TopologicalTraversal(s.resourceGraph, func(uid graph.ResourceUID) error {
if _, ok := s.resources[uid]; !ok {
return nil
}
return visitor.Visit(s.resources[uid])
})
}
Expand All @@ -112,3 +156,11 @@ func (s *defaultStack) computeResourceUID(res Resource) graph.ResourceUID {
ResID: res.ID(),
}
}

func (s *defaultStack) Lock() {
s.lock.Lock()
}

func (s *defaultStack) Unlock() {
s.lock.Unlock()
}
Loading

0 comments on commit 5af52aa

Please sign in to comment.