diff --git a/controllers/service/service_controller.go b/controllers/service/service_controller.go index 1b51e9c72..fef585330 100644 --- a/controllers/service/service_controller.go +++ b/controllers/service/service_controller.go @@ -3,10 +3,13 @@ package service import ( "context" "fmt" + "sync" "github.com/go-logr/logr" + "github.com/google/uuid" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers" @@ -33,6 +36,8 @@ const ( serviceTagPrefix = "service.k8s.aws" serviceAnnotationPrefix = "service.beta.kubernetes.io" controllerName = "service" + + nlbListenerLimit = 50 ) func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder, @@ -48,7 +53,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider, elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags, controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils, - backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules) + backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient) stackMarshaller := deploy.NewDefaultStackMarshaller() stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger) return &serviceReconciler{ @@ -65,6 +70,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde stackDeployer: stackDeployer, logger: logger, + allocatedServices: map[string]map[int32]struct{}{}, + lock: sync.Mutex{}, + maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles, } } @@ -83,6 +91,9 @@ type serviceReconciler struct { stackDeployer deploy.StackDeployer logger logr.Logger + allocatedServices map[string]map[int32]struct{} + initialized bool + lock sync.Mutex maxConcurrentReconciles int } @@ -99,16 +110,101 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil { return client.IgnoreNotFound(err) } + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + // AllocateService has to be locked to guarantee thread-safe since it read/writes map concurrently + r.lock.Lock() + if err := r.allocatedService(ctx, svc); err != nil { + r.lock.Unlock() + return err + } + r.lock.Unlock() + } + stack, lb, backendSGRequired, err := r.buildModel(ctx, svc) if err != nil { return err } - if lb == nil { - return r.cleanupLoadBalancerResources(ctx, svc, stack) + if lb == nil || !svc.DeletionTimestamp.IsZero() { + return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil) } return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired) } +// AllocateService makes sure that each service is allocated to a virtual stack, and a stack will not have more than 50 service/listener(the limit of listener on NLB). +// It maintains an in-memory cache to be able to track the usage. If no stack is available, it will create a new stack. +func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error { + if !r.initialized { + var serviceList corev1.ServiceList + if err := r.k8sClient.List(ctx, &serviceList); err != nil { + return err + } + for _, svc := range serviceList.Items { + if svc.Annotations[service.LoadBalancerStackKey] != "" { + if r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] == nil { + r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{} + } + for _, port := range svc.Spec.Ports { + r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{} + } + } + } + r.initialized = true + } + + if !svc.DeletionTimestamp.IsZero() { + if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok { + return nil + } + + for _, port := range svc.Spec.Ports { + delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort) + } + + if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 { + delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey]) + } + + return nil + } + + // If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller + if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" { + return nil + } + + allocated := false + for stackName := range r.allocatedServices { + usedPort := r.allocatedServices[stackName] + if len(usedPort) <= nlbListenerLimit-len(svc.Spec.Ports) { + svc.Annotations[service.LoadBalancerStackKey] = stackName + if err := r.k8sClient.Update(ctx, svc); err != nil { + return err + } + for _, port := range svc.Spec.Ports { + usedPort[port.NodePort] = struct{}{} + } + r.allocatedServices[stackName] = usedPort + allocated = true + break + } + } + + if !allocated { + stackName := uuid.New().String() + svc.Annotations[service.LoadBalancerStackKey] = stackName + if err := r.k8sClient.Update(ctx, svc); err != nil { + return err + } + if r.allocatedServices[stackName] == nil { + r.allocatedServices[stackName] = map[int32]struct{}{} + } + for _, port := range svc.Spec.Ports { + r.allocatedServices[stackName][port.NodePort] = struct{}{} + } + } + return nil +} + func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) { stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc) if err != nil { @@ -140,10 +236,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err)) return err } + // always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops, + // make sure only one goroutine is building the model at a time to guarantee thread safety. + stack.Lock() err := r.deployModel(ctx, svc, stack) if err != nil { + stack.Unlock() return err } + stack.Unlock() lbDNS, err := lb.DNSName().Resolve(ctx) if err != nil { return err @@ -163,15 +264,28 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, return nil } -func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error { +func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error { if k8s.HasFinalizer(svc, serviceFinalizer) { + stack.Lock() err := r.deployModel(ctx, svc, stack) if err != nil { + stack.Unlock() return err } - if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil { - return err + stack.Unlock() + if cleanlb { + nsName := k8s.NamespacedName(svc) + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + nsName = types.NamespacedName{ + Namespace: "stack", + Name: svc.Annotations[service.LoadBalancerStackKey], + } + } + if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) { + return err + } } + if err = r.cleanupServiceStatus(ctx, svc); err != nil { r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err)) return err @@ -189,11 +303,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin svc.Status.LoadBalancer.Ingress[0].IP != "" || svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS { svcOld := svc.DeepCopy() - svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ - { - Hostname: lbDNS, - }, + ingress := corev1.LoadBalancerIngress{ + Hostname: lbDNS, } + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + for _, port := range svc.Spec.Ports { + ingress.Ports = append(ingress.Ports, corev1.PortStatus{ + Port: port.NodePort, + }) + } + } + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress} if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil { return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc)) } @@ -221,6 +341,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag if err := r.setupWatches(ctx, c); err != nil { return err } + return nil } diff --git a/pkg/deploy/ec2/security_group_manager.go b/pkg/deploy/ec2/security_group_manager.go index 9fc84ceeb..37bd33247 100644 --- a/pkg/deploy/ec2/security_group_manager.go +++ b/pkg/deploy/ec2/security_group_manager.go @@ -2,6 +2,8 @@ package ec2 import ( "context" + "time" + awssdk "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" ec2sdk "github.com/aws/aws-sdk-go/service/ec2" @@ -12,7 +14,6 @@ import ( ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" "sigs.k8s.io/aws-load-balancer-controller/pkg/runtime" - "time" ) const ( diff --git a/pkg/deploy/ec2/security_group_synthesizer.go b/pkg/deploy/ec2/security_group_synthesizer.go index b5e76845d..e3244452f 100644 --- a/pkg/deploy/ec2/security_group_synthesizer.go +++ b/pkg/deploy/ec2/security_group_synthesizer.go @@ -2,6 +2,7 @@ package ec2 import ( "context" + "github.com/go-logr/logr" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" diff --git a/pkg/model/core/graph/resource_graph.go b/pkg/model/core/graph/resource_graph.go index 78413c9aa..157552ffb 100644 --- a/pkg/model/core/graph/resource_graph.go +++ b/pkg/model/core/graph/resource_graph.go @@ -13,6 +13,8 @@ type ResourceGraph interface { // Add a node into ResourceGraph. AddNode(node ResourceUID) + RemoveNode(node ResourceUID) + // Add a edge into ResourceGraph, where dstNode depends on srcNode. AddEdge(srcNode ResourceUID, dstNode ResourceUID) @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) { g.nodes = append(g.nodes, node) } +func (g *defaultResourceGraph) RemoveNode(node ResourceUID) { + for i, n := range g.nodes { + if n == node { + g.nodes = append(g.nodes[:i], g.nodes[i+1:]...) + break + } + } + delete(g.outEdges, node) + for _, nodes := range g.outEdges { + for i, n := range nodes { + if n == node { + nodes = append(nodes[:i], nodes[i+1:]...) + break + } + } + } +} + // Add a edge into ResourceGraph, where dstNode depends on srcNode. func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) { g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode) diff --git a/pkg/model/core/stack.go b/pkg/model/core/stack.go index 9329bc121..ef808936f 100644 --- a/pkg/model/core/stack.go +++ b/pkg/model/core/stack.go @@ -1,8 +1,11 @@ package core import ( - "github.com/pkg/errors" "reflect" + "sync" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" ) @@ -14,6 +17,19 @@ type Stack interface { // Add a resource into stack. AddResource(res Resource) error + // Remove a resource + RemoveResource(id graph.ResourceUID) + + AddService(service *corev1.Service) + + RemoveService(service *corev1.Service) + + ListServices() []corev1.Service + + Lock() + + Unlock() + // Add a dependency relationship between resources. AddDependency(dependee Resource, depender Resource) error @@ -30,8 +46,10 @@ func NewDefaultStack(stackID StackID) *defaultStack { return &defaultStack{ stackID: stackID, + services: make(map[string]corev1.Service), resources: make(map[graph.ResourceUID]Resource), resourceGraph: graph.NewDefaultResourceGraph(), + lock: sync.Mutex{}, } } @@ -41,8 +59,11 @@ var _ Stack = &defaultStack{} type defaultStack struct { stackID StackID + services map[string]corev1.Service resources map[graph.ResourceUID]Resource resourceGraph graph.ResourceGraph + + lock sync.Mutex } func (s *defaultStack) StackID() StackID { @@ -60,6 +81,26 @@ func (s *defaultStack) AddResource(res Resource) error { return nil } +func (s *defaultStack) RemoveResource(id graph.ResourceUID) { + delete(s.resources, id) + s.resourceGraph.RemoveNode(id) +} + +func (s *defaultStack) AddService(service *corev1.Service) { + s.services[string(service.UID)] = *service +} + +func (s *defaultStack) RemoveService(service *corev1.Service) { + delete(s.services, string(service.UID)) +} + +func (s *defaultStack) ListServices() (result []corev1.Service) { + for _, service := range s.services { + result = append(result, service) + } + return +} + // Add a dependency relationship between resources. func (s *defaultStack) AddDependency(dependee Resource, depender Resource) error { dependeeResUID := s.computeResourceUID(dependee) @@ -101,6 +142,9 @@ func (s *defaultStack) ListResources(pResourceSlice interface{}) error { func (s *defaultStack) TopologicalTraversal(visitor ResourceVisitor) error { return graph.TopologicalTraversal(s.resourceGraph, func(uid graph.ResourceUID) error { + if _, ok := s.resources[uid]; !ok { + return nil + } return visitor.Visit(s.resources[uid]) }) } @@ -112,3 +156,11 @@ func (s *defaultStack) computeResourceUID(res Resource) graph.ResourceUID { ResID: res.ID(), } } + +func (s *defaultStack) Lock() { + s.lock.Lock() +} + +func (s *defaultStack) Unlock() { + s.lock.Unlock() +} diff --git a/pkg/model/ec2/security_group.go b/pkg/model/ec2/security_group.go index 3d40369f2..8b575f3e2 100644 --- a/pkg/model/ec2/security_group.go +++ b/pkg/model/ec2/security_group.go @@ -2,8 +2,11 @@ package ec2 import ( "context" + "reflect" + "github.com/pkg/errors" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core" + "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" ) var _ core.Resource = &SecurityGroup{} @@ -26,6 +29,18 @@ func NewSecurityGroup(stack core.Stack, id string, spec SecurityGroupSpec) *Secu Spec: spec, Status: nil, } + // since security can be overwritten, remove and re-add it to the stack to make sure latest spec is updated + var sgRes []*SecurityGroup + stack.ListResources(&sgRes) + for _, res := range sgRes { + if res.ID() == id { + sg.Status = res.Status + } + } + stack.RemoveResource(graph.ResourceUID{ + ResType: reflect.TypeOf(&SecurityGroup{}), + ResID: id, + }) stack.AddResource(sg) return sg } diff --git a/pkg/service/model_build_listener.go b/pkg/service/model_build_listener.go index d320fe5ea..1258e535c 100644 --- a/pkg/service/model_build_listener.go +++ b/pkg/service/model_build_listener.go @@ -19,12 +19,18 @@ func (t *defaultModelBuildTask) buildListeners(ctx context.Context, scheme elbv2 return err } - for _, port := range t.service.Spec.Ports { - _, err := t.buildListener(ctx, port, *cfg, scheme) - if err != nil { - return err + originalSvc := t.service + for _, service := range t.stack.ListServices() { + t.service = &service + for _, port := range service.Spec.Ports { + _, err := t.buildListener(ctx, port, *cfg, scheme) + if err != nil { + return err + } } } + t.service = originalSvc + return nil } @@ -35,6 +41,9 @@ func (t *defaultModelBuildTask) buildListener(ctx context.Context, port corev1.S return nil, err } listenerResID := fmt.Sprintf("%v", port.Port) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + listenerResID = fmt.Sprintf("%v", port.NodePort) + } ls := elbv2model.NewListener(t.stack, listenerResID, lsSpec) return ls, nil } @@ -73,9 +82,13 @@ func (t *defaultModelBuildTask) buildListenerSpec(ctx context.Context, port core } defaultActions := t.buildListenerDefaultActions(ctx, targetGroup) + portNumber := int64(port.Port) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + portNumber = int64(port.NodePort) + } return elbv2model.ListenerSpec{ LoadBalancerARN: t.loadBalancer.LoadBalancerARN(), - Port: int64(port.Port), + Port: portNumber, Protocol: listenerProtocol, Certificates: certificates, SSLPolicy: sslPolicy, diff --git a/pkg/service/model_build_load_balancer.go b/pkg/service/model_build_load_balancer.go index 3d8535c49..30eb11243 100644 --- a/pkg/service/model_build_load_balancer.go +++ b/pkg/service/model_build_load_balancer.go @@ -43,6 +43,13 @@ func (t *defaultModelBuildTask) buildLoadBalancer(ctx context.Context, scheme el if err != nil { return err } + var resLBs []*elbv2model.LoadBalancer + t.stack.ListResources(&resLBs) + if len(resLBs) > 0 { + t.loadBalancer = resLBs[0] + t.loadBalancer.Spec = spec + return nil + } t.loadBalancer = elbv2model.NewLoadBalancer(t.stack, resourceIDLoadBalancer, spec) return nil } @@ -109,7 +116,14 @@ func (t *defaultModelBuildTask) buildLoadBalancerSecurityGroups(ctx context.Cont if !t.enableBackendSG { t.backendSGIDToken = managedSG.GroupID() } else { - backendSGID, err := t.backendSGProvider.Get(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(t.service)}) + nsName := k8s.NamespacedName(t.service) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + nsName = types.NamespacedName{ + Namespace: "stack", + Name: t.service.Annotations[LoadBalancerStackKey], + } + } + backendSGID, err := t.backendSGProvider.Get(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}) if err != nil { return nil, err } @@ -473,6 +487,10 @@ func (t *defaultModelBuildTask) getAnnotationSpecificLbAttributes() (map[string] var invalidLoadBalancerNamePattern = regexp.MustCompile("[[:^alnum:]]") func (t *defaultModelBuildTask) buildLoadBalancerName(_ context.Context, scheme elbv2model.LoadBalancerScheme) (string, error) { + stackName := t.serviceUtils.GetServiceStackName(t.service) + if stackName != "" { + return stackName, nil + } var name string if exists := t.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerName, &name, t.service.Annotations); exists { // The name of the loadbalancer can only have up to 32 characters diff --git a/pkg/service/model_build_load_balancer_test.go b/pkg/service/model_build_load_balancer_test.go index 46de842a6..018a744c7 100644 --- a/pkg/service/model_build_load_balancer_test.go +++ b/pkg/service/model_build_load_balancer_test.go @@ -1431,6 +1431,7 @@ func Test_defaultModelBuildTask_buildLoadBalancerName(t *testing.T) { service: tt.service, clusterName: tt.clusterName, annotationParser: annotations.NewSuffixAnnotationParser("service.beta.kubernetes.io"), + serviceUtils: &defaultServiceUtils{}, } got, err := task.buildLoadBalancerName(context.Background(), tt.scheme) if err != nil { diff --git a/pkg/service/model_build_managed_sg.go b/pkg/service/model_build_managed_sg.go index 60381f6cf..44e39dbcd 100644 --- a/pkg/service/model_build_managed_sg.go +++ b/pkg/service/model_build_managed_sg.go @@ -25,6 +25,7 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroup(ctx context.Context, i if err != nil { return nil, err } + sg := ec2model.NewSecurityGroup(t.stack, resourceIDManagedSecurityGroup, sgSpec) return sg, nil } @@ -50,6 +51,10 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroupSpec(ctx context.Contex var invalidSecurityGroupNamePtn, _ = regexp.Compile("[[:^alnum:]]") func (t *defaultModelBuildTask) buildManagedSecurityGroupName(_ context.Context) string { + stackName := t.serviceUtils.GetServiceStackName(t.service) + if stackName != "" { + return stackName + } uuidHash := sha256.New() _, _ = uuidHash.Write([]byte(t.clusterName)) _, _ = uuidHash.Write([]byte(t.service.Name)) @@ -68,31 +73,36 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroupIngressPermissions(ctx if err != nil { return nil, err } - for _, port := range t.service.Spec.Ports { - listenPort := int64(port.Port) - for _, cidr := range cidrs { - if !strings.Contains(cidr, ":") { - permissions = append(permissions, ec2model.IPPermission{ - IPProtocol: strings.ToLower(string(port.Protocol)), - FromPort: awssdk.Int64(listenPort), - ToPort: awssdk.Int64(listenPort), - IPRanges: []ec2model.IPRange{ - { - CIDRIP: cidr, + for _, service := range t.stack.ListServices() { + for _, port := range service.Spec.Ports { + listenPort := int64(port.Port) + if service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + listenPort = int64(port.NodePort) + } + for _, cidr := range cidrs { + if !strings.Contains(cidr, ":") { + permissions = append(permissions, ec2model.IPPermission{ + IPProtocol: strings.ToLower(string(port.Protocol)), + FromPort: awssdk.Int64(listenPort), + ToPort: awssdk.Int64(listenPort), + IPRanges: []ec2model.IPRange{ + { + CIDRIP: cidr, + }, }, - }, - }) - } else { - permissions = append(permissions, ec2model.IPPermission{ - IPProtocol: strings.ToLower(string(port.Protocol)), - FromPort: awssdk.Int64(listenPort), - ToPort: awssdk.Int64(listenPort), - IPv6Range: []ec2model.IPv6Range{ - { - CIDRIPv6: cidr, + }) + } else { + permissions = append(permissions, ec2model.IPPermission{ + IPProtocol: strings.ToLower(string(port.Protocol)), + FromPort: awssdk.Int64(listenPort), + ToPort: awssdk.Int64(listenPort), + IPv6Range: []ec2model.IPv6Range{ + { + CIDRIPv6: cidr, + }, }, - }, - }) + }) + } } } } diff --git a/pkg/service/model_builder.go b/pkg/service/model_builder.go index 94ea62ee4..3648aa252 100644 --- a/pkg/service/model_builder.go +++ b/pkg/service/model_builder.go @@ -2,12 +2,16 @@ package service import ( "context" + "fmt" + "reflect" "strconv" "sync" "github.com/aws/aws-sdk-go/service/ec2" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services" @@ -16,8 +20,10 @@ import ( "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core" + "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -26,6 +32,9 @@ const ( LoadBalancerTargetTypeIP = "ip" LoadBalancerTargetTypeInstance = "instance" lbAttrsDeletionProtection = "deletion_protection.enabled" + + LoadBalancerAllocatingPortKey = "service.beta.kubernetes.io/aws-load-balancer-allocating-port" + LoadBalancerStackKey = "service.beta.kubernetes.io/aws-load-balancer-stack-name" ) // ModelBuilder builds the model stack for the service resource. @@ -40,7 +49,7 @@ func NewDefaultModelBuilder(annotationParser annotations.Parser, subnetsResolver elbv2TaggingManager elbv2deploy.TaggingManager, ec2Client services.EC2, featureGates config.FeatureGates, clusterName string, defaultTags map[string]string, externalManagedTags []string, defaultSSLPolicy string, defaultTargetType string, enableIPTargetType bool, serviceUtils ServiceUtils, backendSGProvider networking.BackendSGProvider, sgResolver networking.SecurityGroupResolver, enableBackendSG bool, - disableRestrictedSGRules bool) *defaultModelBuilder { + disableRestrictedSGRules bool, k8sClient client.Client) *defaultModelBuilder { return &defaultModelBuilder{ annotationParser: annotationParser, subnetsResolver: subnetsResolver, @@ -61,6 +70,8 @@ func NewDefaultModelBuilder(annotationParser annotations.Parser, subnetsResolver ec2Client: ec2Client, enableBackendSG: enableBackendSG, disableRestrictedSGRules: disableRestrictedSGRules, + stackGlobalCache: map[core.StackID]core.Stack{}, + client: k8sClient, } } @@ -80,6 +91,8 @@ type defaultModelBuilder struct { enableBackendSG bool disableRestrictedSGRules bool + stackGlobalCache map[core.StackID]core.Stack + clusterName string vpcID string defaultTags map[string]string @@ -87,10 +100,62 @@ type defaultModelBuilder struct { defaultSSLPolicy string defaultTargetType elbv2model.TargetType enableIPTargetType bool + + client client.Client + + initialized bool + lock sync.RWMutex } func (b *defaultModelBuilder) Build(ctx context.Context, service *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) { - stack := core.NewDefaultStack(core.StackID(k8s.NamespacedName(service))) + // Initialize the global cache if not initialized + if !b.initialized { + // if not initialized, we need to build the global cache based on existing services + var serviceList corev1.ServiceList + if b.client != nil { + if err := b.client.List(ctx, &serviceList); err != nil { + return nil, nil, false, err + } + for _, svc := range serviceList.Items { + if svc.Annotations[LoadBalancerStackKey] != "" && svc.DeletionTimestamp.IsZero() { + stackID := core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: svc.Annotations[LoadBalancerStackKey], + }) + b.lock.Lock() + if b.stackGlobalCache[stackID] == nil { + b.stackGlobalCache[stackID] = core.NewDefaultStack(stackID) + } + b.stackGlobalCache[stackID].AddService(&svc) + b.lock.Unlock() + } + } + } + b.initialized = true + } + + // For each stack ID, if we found the stack annotation, this means the service will be sharing the same stack with other services + // If so, we should reuse the same stack in the cache so that we can reuse the load balancer with shared listeners + stackID := core.StackID(k8s.NamespacedName(service)) + var stack core.Stack + stack = core.NewDefaultStack(stackID) + if service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + // service will be allocated to a stack with shared loadbalancer + if service.Annotations[LoadBalancerStackKey] == "" { + return nil, nil, false, errors.Errorf("service %v/%v is waiting to allocated for a stack", service.Namespace, service.Name) + } + stackID = core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: service.Annotations[LoadBalancerStackKey], + }) + b.lock.Lock() + if b.stackGlobalCache[stackID] == nil { + s := core.NewDefaultStack(stackID) + b.stackGlobalCache[stackID] = s + } + stack = b.stackGlobalCache[stackID] + b.lock.Unlock() + } task := &defaultModelBuildTask{ clusterName: b.clusterName, vpcID: b.vpcID, @@ -221,13 +286,63 @@ func (t *defaultModelBuildTask) run(ctx context.Context) error { return errors.Errorf("deletion_protection is enabled, cannot delete the service: %v", t.service.Name) } } + + t.cleanupStackWithRemovingService() return nil } + t.stack.AddService(t.service) err := t.buildModel(ctx) return err } +// When service is deleted, update resources in the stack to make sure things are cleaned up properly. +func (t *defaultModelBuildTask) cleanupStackWithRemovingService() { + for _, port := range t.service.Spec.Ports { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: fmt.Sprintf("%v", port.NodePort), + ResType: reflect.TypeOf(&elbv2model.Listener{}), + }) + svcPort := intstr.FromInt(int(port.Port)) + tgResourceID := t.buildTargetGroupResourceID(k8s.NamespacedName(t.service), svcPort) + var targetGroups []*elbv2model.TargetGroup + var targetGroupBindingResources []*elbv2model.TargetGroupBindingResource + t.stack.ListResources(&targetGroups) + t.stack.ListResources(&targetGroupBindingResources) + for _, tg := range targetGroups { + if tg.ID() == tgResourceID { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: tg.ID(), + ResType: reflect.TypeOf(tg), + }) + } + } + for _, tgBinding := range targetGroupBindingResources { + if tgBinding.ID() == tgResourceID { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: tgBinding.ID(), + ResType: reflect.TypeOf(tgBinding), + }) + } + } + } + // Delete the load balancer if there is no listener left. + var resLSs []*elbv2model.Listener + t.stack.ListResources(&resLSs) + if len(resLSs) == 0 { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: "LoadBalancer", + ResType: reflect.TypeOf(&elbv2model.LoadBalancer{}), + }) + t.loadBalancer = nil + } + t.stack.RemoveService(t.service) +} + func (t *defaultModelBuildTask) buildModel(ctx context.Context) error { + // always lock the stack building models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops, + // make sure only one goroutine is building the model at a time to guarantee thread safety. + t.stack.Lock() + defer t.stack.Unlock() scheme, err := t.buildLoadBalancerScheme(ctx) if err != nil { return err @@ -262,3 +377,14 @@ func (t *defaultModelBuildTask) getDeletionProtectionViaAnnotation(svc corev1.Se } return false, nil } + +func (t *defaultModelBuildTask) stackID() core.StackID { + stackID := core.StackID(k8s.NamespacedName(t.service)) + if t.service.Annotations[LoadBalancerStackKey] != "" { + stackID = core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: t.service.Annotations[LoadBalancerStackKey], + }) + } + return stackID +} diff --git a/pkg/service/model_builder_test.go b/pkg/service/model_builder_test.go index 3bbe57fcf..ded7809d3 100644 --- a/pkg/service/model_builder_test.go +++ b/pkg/service/model_builder_test.go @@ -6417,7 +6417,7 @@ func Test_defaultModelBuilderTask_Build(t *testing.T) { } builder := NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, "vpc-xxx", trackingProvider, elbv2TaggingManager, ec2Client, featureGates, "my-cluster", nil, nil, "ELBSecurityPolicy-2016-08", defaultTargetType, enableIPTargetType, serviceUtils, - backendSGProvider, sgResolver, tt.enableBackendSG, tt.disableRestrictedSGRules) + backendSGProvider, sgResolver, tt.enableBackendSG, tt.disableRestrictedSGRules, nil) ctx := context.Background() stack, _, _, err := builder.Build(ctx, tt.svc) if tt.wantError { diff --git a/pkg/service/service_utils.go b/pkg/service/service_utils.go index f367e2579..7cfa86097 100644 --- a/pkg/service/service_utils.go +++ b/pkg/service/service_utils.go @@ -1,6 +1,8 @@ package service import ( + "fmt" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" "sigs.k8s.io/aws-load-balancer-controller/pkg/config" @@ -14,6 +16,8 @@ type ServiceUtils interface { // IsServicePendingFinalization returns true if the service contains the aws-load-balancer-controller finalizer IsServicePendingFinalization(service *corev1.Service) bool + + GetServiceStackName(service *corev1.Service) string } func NewServiceUtils(annotationsParser annotations.Parser, serviceFinalizer string, loadBalancerClass string, @@ -61,6 +65,13 @@ func (u *defaultServiceUtils) IsServiceSupported(service *corev1.Service) bool { return u.checkAWSLoadBalancerTypeAnnotation(service) } +func (u *defaultServiceUtils) GetServiceStackName(service *corev1.Service) string { + if service.Annotations[LoadBalancerStackKey] != "" { + return fmt.Sprintf("k8s-%.8s", service.Annotations[LoadBalancerStackKey]) + } + return "" +} + func (u *defaultServiceUtils) checkAWSLoadBalancerTypeAnnotation(service *corev1.Service) bool { lbType := "" _ = u.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerType, &lbType, service.Annotations)