diff --git a/admiral/pkg/clusters/admiralDatabaseClient.go b/admiral/pkg/clusters/admiralDatabaseClient.go index a8c550d6..e476e596 100644 --- a/admiral/pkg/clusters/admiralDatabaseClient.go +++ b/admiral/pkg/clusters/admiralDatabaseClient.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "github.com/istio-ecosystem/admiral/admiral/apis/v1" + "github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" + "github.com/istio-ecosystem/admiral/admiral/pkg/controller/util" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" "io/ioutil" @@ -207,17 +209,20 @@ func ReadAndUpdateSyncAdmiralConfig(rr *RemoteRegistry) error { return errors.New(fmt.Sprintf("task=%s, failed to parse DynamicConfigData", common.DynamicConfigUpdate)) } + if IsCacheWarmupTime(rr) { + log.Infof("task=%s, NeedToUpdateConfig=false, processing skipped during cache warm up state", common.DynamicConfigUpdate) + return nil + } + if IsDynamicConfigChanged(configData) { - log.Infof(fmt.Sprintf("task=%s, updating DynamicConfigData with Admiral config", common.DynamicConfigUpdate)) + log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=true", common.DynamicConfigUpdate)) UpdateSyncAdmiralConfig(configData) ctx := context.Context(context.Background()) //Process NLB Cluster processLBMigration(ctx, rr, common.GetAdmiralParams().NLBEnabledClusters, &rr.AdmiralCache.NLBEnabledCluster, common.GetAdmiralParams().NLBIngressLabel) - //Process CLB Cluster - processLBMigration(ctx, rr, common.GetAdmiralParams().CLBEnabledClusters, &rr.AdmiralCache.CLBEnabledCluster, common.GetAdmiralParams().LabelSet.GatewayApp) } else { - log.Infof(fmt.Sprintf("task=%s, no need to update DynamicConfigData", common.DynamicConfigUpdate)) + log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=false", common.DynamicConfigUpdate)) } return nil @@ -225,31 +230,50 @@ func ReadAndUpdateSyncAdmiralConfig(rr *RemoteRegistry) error { func processLBMigration(ctx context.Context, rr *RemoteRegistry, updatedLBs []string, existingCache *[]string, lbLabel string) { - log.Infof("task=%s, Processing LB migration for %s. UpdateReceived=%s, ExistingCache=%s, ", common.LBUpdateProcessor, lbLabel, updatedLBs, existingCache) + ctxLogger := log.WithField("task", common.LBUpdateProcessor) - for _, cluster := range getLBToProcess(updatedLBs, existingCache) { + clusterToProcess := getLBToProcess(updatedLBs, existingCache) + ctxLogger.Infof("ClusterToProccess=%s, LBLabel=%s", clusterToProcess, lbLabel) + + for _, cluster := range clusterToProcess { err := isServiceControllerInitialized(rr.remoteControllers[cluster]) if err == nil { for _, fetchService := range rr.remoteControllers[cluster].ServiceController.Cache.Get(common.NamespaceIstioSystem) { if fetchService.Labels[common.App] == lbLabel { - log.Infof("task=%s, Cluster=%s, Processing LB migration for Cluster.", common.LBUpdateProcessor, cluster) - go handleServiceEventForDeployment(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).DeploymentController, rr.GetRemoteController(cluster).ServiceController, HandleEventForDeployment) - go handleServiceEventForRollout(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).RolloutController, rr.GetRemoteController(cluster).ServiceController, HandleEventForRollout) + start := time.Now() + ctxLogger.Infof("Cluster=%s, Processing LB migration for Cluster.", cluster) + + //Trigger Service event explicitly for migration + ctx = context.WithValue(ctx, common.EventType, admiral.Update) + err := handleEventForService(ctx, fetchService, rr, cluster) + if err != nil { + util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor, + lbLabel, "", cluster, "Error="+err.Error(), start) + } else { + util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor, + lbLabel, "", cluster, "Completed", start) + } } } } else { - log.Infof("task=%s, Cluster=%s, Service Controller not initializ. Skipped LB migration for Cluster.", common.LBUpdateProcessor, cluster) + ctxLogger.Infof("Cluster=%s, Service Controller not initialize. Skipped LB migration for Cluster. Err: %s", cluster, err.Error()) } } } func getLBToProcess(updatedLB []string, cache *[]string) []string { - var clusersToProcess []string + clusersToProcess := make([]string, 0) if cache == nil || len(*cache) == 0 { *cache = updatedLB return updatedLB } + + if len(updatedLB) == 0 { + clusersToProcess = *cache + *cache = updatedLB + return clusersToProcess + } //Validate if New ClusterAdded for _, clusterFromAdmiralParam := range updatedLB { if !slices.Contains(*cache, clusterFromAdmiralParam) { @@ -259,13 +283,13 @@ func getLBToProcess(updatedLB []string, cache *[]string) []string { } //Validate if cluster Removed - for i, clusterFromCache := range *cache { + for _, clusterFromCache := range *cache { if !slices.Contains(updatedLB, clusterFromCache) { clusersToProcess = append(clusersToProcess, clusterFromCache) - *cache = slices.Delete(*cache, i, i+1) } } - + //Final dynamoDB data as cache + *cache = updatedLB return clusersToProcess } diff --git a/admiral/pkg/clusters/admiralDatabaseClient_test.go b/admiral/pkg/clusters/admiralDatabaseClient_test.go index 02fa06b6..1b2bfa1f 100644 --- a/admiral/pkg/clusters/admiralDatabaseClient_test.go +++ b/admiral/pkg/clusters/admiralDatabaseClient_test.go @@ -710,12 +710,20 @@ func Test_getLBToProcess(t *testing.T) { args args want []string }{ + {"When cache is not updated and update config is also empty", args{updatedLB: []string{}, cache: &[]string{}}, []string{}}, + {"When cache is not updated and update config is has one item", args{updatedLB: []string{"cluster1"}, cache: &[]string{}}, []string{"cluster1"}}, + {"When cache has single item and update config removed that item", args{updatedLB: []string{}, cache: &[]string{"cluster1"}}, []string{"cluster1"}}, {"When cache is not updated then getLBToProcess should be all updated list ", args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{}}, []string{"cluster1", "cluster2"}}, {"When cluster is removed from update list then getLBToProcess should return removed cluster", args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2", "cluster3"}}, []string{"cluster3"}}, {"When cluster is added from update list then getLBToProcess should return added cluster", args{updatedLB: []string{"cluster1", "cluster2", "cluster3"}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster3"}}, + {"When object is removed and cache become empty", args{updatedLB: []string{}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster1", "cluster2"}}, + {"When cache and config is same then expect no value to be returned", + args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2"}}, []string{}}, + {"When config is adding and removing cluster at same time", + args{updatedLB: []string{"cluster2"}, cache: &[]string{"cluster1"}}, []string{"cluster2", "cluster1"}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/admiral/pkg/clusters/service_handler.go b/admiral/pkg/clusters/service_handler.go index ff33baff..f5f32a8c 100644 --- a/admiral/pkg/clusters/service_handler.go +++ b/admiral/pkg/clusters/service_handler.go @@ -99,7 +99,7 @@ func handleServiceEventForDeployment( return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType)) } - if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) { + if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) { // The eventType is overridden to admiral.Update. This is mainly // for admiral.Delete events sent for the ingress in the cluster // else it would delete all the SEs in the source and dependent clusters @@ -164,7 +164,7 @@ func handleServiceEventForRollout( return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType)) } - if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) { + if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) { // The eventType is overridden to admiral.Update. This is mainly // for admiral.Delete events sent for the ingress in the cluster // else it would delete all the SEs in the source and dependent clusters diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index d4a57af5..a7809a6c 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -876,17 +876,42 @@ func getOverwrittenLoadBalancer(ctx *logrus.Entry, rc *RemoteController, cluster return common.DummyAdmiralGlobal, common.DefaultMtlsPort, err } + ctx = ctx.WithFields(logrus.Fields{ + "task": common.LBUpdateProcessor, + "clusterName": clusterName, + }) + endpoint, port := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem) + ctx = ctx.WithFields(logrus.Fields{ + "RegularLB": endpoint, + "Port": port, + }) + + //Overwrite for NLB if slices.Contains(admiralCache.NLBEnabledCluster, clusterName) { - ctx = ctx.WithField("task", common.LBUpdateProcessor) overwriteEndpoint, overwritePort := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().NLBIngressLabel, common.NamespaceIstioSystem) - //Validate if provided LB information is not default dummy, If Dummy return CLB + ctx = ctx.WithFields(logrus.Fields{ + "OverwritenLB": overwriteEndpoint, + "Port": overwritePort, + "OverwrittenLabel": common.GetAdmiralParams().NLBIngressLabel, + }) + + //Validate if provided LB information is not default dummy, If Dummy then coutinue default LB if len(overwriteEndpoint) > 0 && overwritePort > 0 && overwriteEndpoint != common.DummyAdmiralGlobal { - ctx.Info("Overwriting LB:", endpoint, ", port:", port, ", clusterName:", clusterName) - return overwriteEndpoint, port, nil + ctx = ctx.WithFields(logrus.Fields{ + "Overwritten": true, + }) + ctx.Info("") + return overwriteEndpoint, overwritePort, nil } + + ctx = ctx.WithFields(logrus.Fields{ + "Overwritten": false, + }) } + + ctx.Info("") return endpoint, port, nil } diff --git a/admiral/pkg/clusters/types.go b/admiral/pkg/clusters/types.go index af75636b..8af68aca 100644 --- a/admiral/pkg/clusters/types.go +++ b/admiral/pkg/clusters/types.go @@ -216,15 +216,6 @@ func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *Remote rr.AdmiralCache.ClusterLocalityCache = common.NewMapOfMaps() } - /* - Load data from dynamoDB before async process starts. - This is done to avoid any transitive state where component starts with outofsync config. - Later down the process like async processor will take on going config pushes. - */ - if common.IsAdmiralDynamicConfigEnabled() { - ReadAndUpdateSyncAdmiralConfig(rr) - } - return rr } diff --git a/admiral/pkg/controller/common/common.go b/admiral/pkg/controller/common/common.go index c0bc3f1b..a14a1cad 100644 --- a/admiral/pkg/controller/common/common.go +++ b/admiral/pkg/controller/common/common.go @@ -41,7 +41,7 @@ const ( NamespaceKubeSystem = "kube-system" NamespaceIstioSystem = "istio-system" IstioIngressGatewayLabelValue = "istio-ingressgateway" - NLBIstioIngressGatewayLabelValue = "nlb-istio-ingressgateway" + NLBIstioIngressGatewayLabelValue = "istio-ingressgateway-nlb" Env = "env" DefaultMtlsPort = 15443 DefaultServiceEntryPort = 80