Skip to content

Commit

Permalink
cherry picking intuit changes for LB migration
Browse files Browse the repository at this point in the history
Signed-off-by: Viraj Kulkarni <[email protected]>
  • Loading branch information
Viraj Kulkarni committed Feb 24, 2025
1 parent 1ee9879 commit 55d00f6
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 30 deletions.
52 changes: 38 additions & 14 deletions admiral/pkg/clusters/admiralDatabaseClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"github.com/istio-ecosystem/admiral/admiral/apis/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
Expand Down Expand Up @@ -207,49 +209,71 @@ func ReadAndUpdateSyncAdmiralConfig(rr *RemoteRegistry) error {
return errors.New(fmt.Sprintf("task=%s, failed to parse DynamicConfigData", common.DynamicConfigUpdate))
}

if IsCacheWarmupTime(rr) {
log.Infof("task=%s, NeedToUpdateConfig=false, processing skipped during cache warm up state", common.DynamicConfigUpdate)
return nil
}

if IsDynamicConfigChanged(configData) {
log.Infof(fmt.Sprintf("task=%s, updating DynamicConfigData with Admiral config", common.DynamicConfigUpdate))
log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=true", common.DynamicConfigUpdate))
UpdateSyncAdmiralConfig(configData)

ctx := context.Context(context.Background())
//Process NLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().NLBEnabledClusters, &rr.AdmiralCache.NLBEnabledCluster, common.GetAdmiralParams().NLBIngressLabel)
//Process CLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().CLBEnabledClusters, &rr.AdmiralCache.CLBEnabledCluster, common.GetAdmiralParams().LabelSet.GatewayApp)
} else {
log.Infof(fmt.Sprintf("task=%s, no need to update DynamicConfigData", common.DynamicConfigUpdate))
log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=false", common.DynamicConfigUpdate))
}

return nil
}

func processLBMigration(ctx context.Context, rr *RemoteRegistry, updatedLBs []string, existingCache *[]string, lbLabel string) {

log.Infof("task=%s, Processing LB migration for %s. UpdateReceived=%s, ExistingCache=%s, ", common.LBUpdateProcessor, lbLabel, updatedLBs, existingCache)
ctxLogger := log.WithField("task", common.LBUpdateProcessor)

for _, cluster := range getLBToProcess(updatedLBs, existingCache) {
clusterToProcess := getLBToProcess(updatedLBs, existingCache)
ctxLogger.Infof("ClusterToProccess=%s, LBLabel=%s", clusterToProcess, lbLabel)

for _, cluster := range clusterToProcess {
err := isServiceControllerInitialized(rr.remoteControllers[cluster])
if err == nil {
for _, fetchService := range rr.remoteControllers[cluster].ServiceController.Cache.Get(common.NamespaceIstioSystem) {
if fetchService.Labels[common.App] == lbLabel {
log.Infof("task=%s, Cluster=%s, Processing LB migration for Cluster.", common.LBUpdateProcessor, cluster)
go handleServiceEventForDeployment(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).DeploymentController, rr.GetRemoteController(cluster).ServiceController, HandleEventForDeployment)
go handleServiceEventForRollout(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).RolloutController, rr.GetRemoteController(cluster).ServiceController, HandleEventForRollout)
start := time.Now()
ctxLogger.Infof("Cluster=%s, Processing LB migration for Cluster.", cluster)

//Trigger Service event explicitly for migration
ctx = context.WithValue(ctx, common.EventType, admiral.Update)
err := handleEventForService(ctx, fetchService, rr, cluster)
if err != nil {
util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor,
lbLabel, "", cluster, "Error="+err.Error(), start)
} else {
util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor,
lbLabel, "", cluster, "Completed", start)
}
}
}
} else {
log.Infof("task=%s, Cluster=%s, Service Controller not initializ. Skipped LB migration for Cluster.", common.LBUpdateProcessor, cluster)
ctxLogger.Infof("Cluster=%s, Service Controller not initialize. Skipped LB migration for Cluster. Err: %s", cluster, err.Error())
}
}
}

func getLBToProcess(updatedLB []string, cache *[]string) []string {

var clusersToProcess []string
clusersToProcess := make([]string, 0)
if cache == nil || len(*cache) == 0 {
*cache = updatedLB
return updatedLB
}

if len(updatedLB) == 0 {
clusersToProcess = *cache
*cache = updatedLB
return clusersToProcess
}
//Validate if New ClusterAdded
for _, clusterFromAdmiralParam := range updatedLB {
if !slices.Contains(*cache, clusterFromAdmiralParam) {
Expand All @@ -259,13 +283,13 @@ func getLBToProcess(updatedLB []string, cache *[]string) []string {
}

//Validate if cluster Removed
for i, clusterFromCache := range *cache {
for _, clusterFromCache := range *cache {
if !slices.Contains(updatedLB, clusterFromCache) {
clusersToProcess = append(clusersToProcess, clusterFromCache)
*cache = slices.Delete(*cache, i, i+1)
}
}

//Final dynamoDB data as cache
*cache = updatedLB
return clusersToProcess
}

Expand Down
8 changes: 8 additions & 0 deletions admiral/pkg/clusters/admiralDatabaseClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,20 @@ func Test_getLBToProcess(t *testing.T) {
args args
want []string
}{
{"When cache is not updated and update config is also empty", args{updatedLB: []string{}, cache: &[]string{}}, []string{}},
{"When cache is not updated and update config is has one item", args{updatedLB: []string{"cluster1"}, cache: &[]string{}}, []string{"cluster1"}},
{"When cache has single item and update config removed that item", args{updatedLB: []string{}, cache: &[]string{"cluster1"}}, []string{"cluster1"}},
{"When cache is not updated then getLBToProcess should be all updated list ",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{}}, []string{"cluster1", "cluster2"}},
{"When cluster is removed from update list then getLBToProcess should return removed cluster",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2", "cluster3"}}, []string{"cluster3"}},
{"When cluster is added from update list then getLBToProcess should return added cluster",
args{updatedLB: []string{"cluster1", "cluster2", "cluster3"}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster3"}},
{"When object is removed and cache become empty", args{updatedLB: []string{}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster1", "cluster2"}},
{"When cache and config is same then expect no value to be returned",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2"}}, []string{}},
{"When config is adding and removing cluster at same time",
args{updatedLB: []string{"cluster2"}, cache: &[]string{"cluster1"}}, []string{"cluster2", "cluster1"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions admiral/pkg/clusters/service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func handleServiceEventForDeployment(
return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType))
}

if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) {
if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) {
// The eventType is overridden to admiral.Update. This is mainly
// for admiral.Delete events sent for the ingress in the cluster
// else it would delete all the SEs in the source and dependent clusters
Expand Down Expand Up @@ -164,7 +164,7 @@ func handleServiceEventForRollout(
return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType))
}

if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) {
if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) {
// The eventType is overridden to admiral.Update. This is mainly
// for admiral.Delete events sent for the ingress in the cluster
// else it would delete all the SEs in the source and dependent clusters
Expand Down
33 changes: 29 additions & 4 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,17 +876,42 @@ func getOverwrittenLoadBalancer(ctx *logrus.Entry, rc *RemoteController, cluster
return common.DummyAdmiralGlobal, common.DefaultMtlsPort, err
}

ctx = ctx.WithFields(logrus.Fields{
"task": common.LBUpdateProcessor,
"clusterName": clusterName,
})

endpoint, port := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem)

ctx = ctx.WithFields(logrus.Fields{
"RegularLB": endpoint,
"Port": port,
})

//Overwrite for NLB
if slices.Contains(admiralCache.NLBEnabledCluster, clusterName) {
ctx = ctx.WithField("task", common.LBUpdateProcessor)
overwriteEndpoint, overwritePort := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().NLBIngressLabel, common.NamespaceIstioSystem)
//Validate if provided LB information is not default dummy, If Dummy return CLB
ctx = ctx.WithFields(logrus.Fields{
"OverwritenLB": overwriteEndpoint,
"Port": overwritePort,
"OverwrittenLabel": common.GetAdmiralParams().NLBIngressLabel,
})

//Validate if provided LB information is not default dummy, If Dummy then coutinue default LB
if len(overwriteEndpoint) > 0 && overwritePort > 0 && overwriteEndpoint != common.DummyAdmiralGlobal {
ctx.Info("Overwriting LB:", endpoint, ", port:", port, ", clusterName:", clusterName)
return overwriteEndpoint, port, nil
ctx = ctx.WithFields(logrus.Fields{
"Overwritten": true,
})
ctx.Info("")
return overwriteEndpoint, overwritePort, nil
}

ctx = ctx.WithFields(logrus.Fields{
"Overwritten": false,
})
}

ctx.Info("")
return endpoint, port, nil
}

Expand Down
9 changes: 0 additions & 9 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,6 @@ func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *Remote
rr.AdmiralCache.ClusterLocalityCache = common.NewMapOfMaps()
}

/*
Load data from dynamoDB before async process starts.
This is done to avoid any transitive state where component starts with outofsync config.
Later down the process like async processor will take on going config pushes.
*/
if common.IsAdmiralDynamicConfigEnabled() {
ReadAndUpdateSyncAdmiralConfig(rr)
}

return rr
}

Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
NamespaceKubeSystem = "kube-system"
NamespaceIstioSystem = "istio-system"
IstioIngressGatewayLabelValue = "istio-ingressgateway"
NLBIstioIngressGatewayLabelValue = "nlb-istio-ingressgateway"
NLBIstioIngressGatewayLabelValue = "istio-ingressgateway-nlb"
Env = "env"
DefaultMtlsPort = 15443
DefaultServiceEntryPort = 80
Expand Down

0 comments on commit 55d00f6

Please sign in to comment.