Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry picking intuit changes for LB migration #391

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions admiral/pkg/clusters/admiralDatabaseClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"github.com/istio-ecosystem/admiral/admiral/apis/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
Expand Down Expand Up @@ -207,49 +209,71 @@ func ReadAndUpdateSyncAdmiralConfig(rr *RemoteRegistry) error {
return errors.New(fmt.Sprintf("task=%s, failed to parse DynamicConfigData", common.DynamicConfigUpdate))
}

if IsCacheWarmupTime(rr) {
log.Infof("task=%s, NeedToUpdateConfig=false, processing skipped during cache warm up state", common.DynamicConfigUpdate)
return nil
}

if IsDynamicConfigChanged(configData) {
log.Infof(fmt.Sprintf("task=%s, updating DynamicConfigData with Admiral config", common.DynamicConfigUpdate))
log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=true", common.DynamicConfigUpdate))
UpdateSyncAdmiralConfig(configData)

ctx := context.Context(context.Background())
//Process NLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().NLBEnabledClusters, &rr.AdmiralCache.NLBEnabledCluster, common.GetAdmiralParams().NLBIngressLabel)
//Process CLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().CLBEnabledClusters, &rr.AdmiralCache.CLBEnabledCluster, common.GetAdmiralParams().LabelSet.GatewayApp)
} else {
log.Infof(fmt.Sprintf("task=%s, no need to update DynamicConfigData", common.DynamicConfigUpdate))
log.Infof(fmt.Sprintf("task=%s, NeedToUpdateConfig=false", common.DynamicConfigUpdate))
}

return nil
}

func processLBMigration(ctx context.Context, rr *RemoteRegistry, updatedLBs []string, existingCache *[]string, lbLabel string) {

log.Infof("task=%s, Processing LB migration for %s. UpdateReceived=%s, ExistingCache=%s, ", common.LBUpdateProcessor, lbLabel, updatedLBs, existingCache)
ctxLogger := log.WithField("task", common.LBUpdateProcessor)

for _, cluster := range getLBToProcess(updatedLBs, existingCache) {
clusterToProcess := getLBToProcess(updatedLBs, existingCache)
ctxLogger.Infof("ClusterToProccess=%s, LBLabel=%s", clusterToProcess, lbLabel)

for _, cluster := range clusterToProcess {
err := isServiceControllerInitialized(rr.remoteControllers[cluster])
if err == nil {
for _, fetchService := range rr.remoteControllers[cluster].ServiceController.Cache.Get(common.NamespaceIstioSystem) {
if fetchService.Labels[common.App] == lbLabel {
log.Infof("task=%s, Cluster=%s, Processing LB migration for Cluster.", common.LBUpdateProcessor, cluster)
go handleServiceEventForDeployment(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).DeploymentController, rr.GetRemoteController(cluster).ServiceController, HandleEventForDeployment)
go handleServiceEventForRollout(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).RolloutController, rr.GetRemoteController(cluster).ServiceController, HandleEventForRollout)
start := time.Now()
ctxLogger.Infof("Cluster=%s, Processing LB migration for Cluster.", cluster)

//Trigger Service event explicitly for migration
ctx = context.WithValue(ctx, common.EventType, admiral.Update)
err := handleEventForService(ctx, fetchService, rr, cluster)
if err != nil {
util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor,
lbLabel, "", cluster, "Error="+err.Error(), start)
} else {
util.LogElapsedTimeSinceTask(ctxLogger, common.LBUpdateProcessor,
lbLabel, "", cluster, "Completed", start)
}
}
}
} else {
log.Infof("task=%s, Cluster=%s, Service Controller not initializ. Skipped LB migration for Cluster.", common.LBUpdateProcessor, cluster)
ctxLogger.Infof("Cluster=%s, Service Controller not initialize. Skipped LB migration for Cluster. Err: %s", cluster, err.Error())
}
}
}

func getLBToProcess(updatedLB []string, cache *[]string) []string {

var clusersToProcess []string
clusersToProcess := make([]string, 0)
if cache == nil || len(*cache) == 0 {
*cache = updatedLB
return updatedLB
}

if len(updatedLB) == 0 {
clusersToProcess = *cache
*cache = updatedLB
return clusersToProcess
}
//Validate if New ClusterAdded
for _, clusterFromAdmiralParam := range updatedLB {
if !slices.Contains(*cache, clusterFromAdmiralParam) {
Expand All @@ -259,13 +283,13 @@ func getLBToProcess(updatedLB []string, cache *[]string) []string {
}

//Validate if cluster Removed
for i, clusterFromCache := range *cache {
for _, clusterFromCache := range *cache {
if !slices.Contains(updatedLB, clusterFromCache) {
clusersToProcess = append(clusersToProcess, clusterFromCache)
*cache = slices.Delete(*cache, i, i+1)
}
}

//Final dynamoDB data as cache
*cache = updatedLB
return clusersToProcess
}

Expand Down
8 changes: 8 additions & 0 deletions admiral/pkg/clusters/admiralDatabaseClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,20 @@ func Test_getLBToProcess(t *testing.T) {
args args
want []string
}{
{"When cache is not updated and update config is also empty", args{updatedLB: []string{}, cache: &[]string{}}, []string{}},
{"When cache is not updated and update config is has one item", args{updatedLB: []string{"cluster1"}, cache: &[]string{}}, []string{"cluster1"}},
{"When cache has single item and update config removed that item", args{updatedLB: []string{}, cache: &[]string{"cluster1"}}, []string{"cluster1"}},
{"When cache is not updated then getLBToProcess should be all updated list ",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{}}, []string{"cluster1", "cluster2"}},
{"When cluster is removed from update list then getLBToProcess should return removed cluster",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2", "cluster3"}}, []string{"cluster3"}},
{"When cluster is added from update list then getLBToProcess should return added cluster",
args{updatedLB: []string{"cluster1", "cluster2", "cluster3"}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster3"}},
{"When object is removed and cache become empty", args{updatedLB: []string{}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster1", "cluster2"}},
{"When cache and config is same then expect no value to be returned",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2"}}, []string{}},
{"When config is adding and removing cluster at same time",
args{updatedLB: []string{"cluster2"}, cache: &[]string{"cluster1"}}, []string{"cluster2", "cluster1"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions admiral/pkg/clusters/service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func handleServiceEventForDeployment(
return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType))
}

if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) {
if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) {
// The eventType is overridden to admiral.Update. This is mainly
// for admiral.Delete events sent for the ingress in the cluster
// else it would delete all the SEs in the source and dependent clusters
Expand Down Expand Up @@ -164,7 +164,7 @@ func handleServiceEventForRollout(
return fmt.Errorf(AlertLogMsg, ctx.Value(common.EventType))
}

if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) {
if common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().LabelSet.GatewayApp) || common.IsIstioIngressGatewayService(svc, common.GetAdmiralParams().NLBIngressLabel) {
// The eventType is overridden to admiral.Update. This is mainly
// for admiral.Delete events sent for the ingress in the cluster
// else it would delete all the SEs in the source and dependent clusters
Expand Down
33 changes: 29 additions & 4 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,17 +876,42 @@ func getOverwrittenLoadBalancer(ctx *logrus.Entry, rc *RemoteController, cluster
return common.DummyAdmiralGlobal, common.DefaultMtlsPort, err
}

ctx = ctx.WithFields(logrus.Fields{
"task": common.LBUpdateProcessor,
"clusterName": clusterName,
})

endpoint, port := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().LabelSet.GatewayApp, common.NamespaceIstioSystem)

ctx = ctx.WithFields(logrus.Fields{
"RegularLB": endpoint,
"Port": port,
})

//Overwrite for NLB
if slices.Contains(admiralCache.NLBEnabledCluster, clusterName) {
ctx = ctx.WithField("task", common.LBUpdateProcessor)
overwriteEndpoint, overwritePort := rc.ServiceController.Cache.GetSingleLoadBalancer(common.GetAdmiralParams().NLBIngressLabel, common.NamespaceIstioSystem)
//Validate if provided LB information is not default dummy, If Dummy return CLB
ctx = ctx.WithFields(logrus.Fields{
"OverwritenLB": overwriteEndpoint,
"Port": overwritePort,
"OverwrittenLabel": common.GetAdmiralParams().NLBIngressLabel,
})

//Validate if provided LB information is not default dummy, If Dummy then coutinue default LB
if len(overwriteEndpoint) > 0 && overwritePort > 0 && overwriteEndpoint != common.DummyAdmiralGlobal {
ctx.Info("Overwriting LB:", endpoint, ", port:", port, ", clusterName:", clusterName)
return overwriteEndpoint, port, nil
ctx = ctx.WithFields(logrus.Fields{
"Overwritten": true,
})
ctx.Info("")
return overwriteEndpoint, overwritePort, nil
}

ctx = ctx.WithFields(logrus.Fields{
"Overwritten": false,
})
}

ctx.Info("")
return endpoint, port, nil
}

Expand Down
9 changes: 0 additions & 9 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,6 @@ func NewRemoteRegistry(ctx context.Context, params common.AdmiralParams) *Remote
rr.AdmiralCache.ClusterLocalityCache = common.NewMapOfMaps()
}

/*
Load data from dynamoDB before async process starts.
This is done to avoid any transitive state where component starts with outofsync config.
Later down the process like async processor will take on going config pushes.
*/
if common.IsAdmiralDynamicConfigEnabled() {
ReadAndUpdateSyncAdmiralConfig(rr)
}

return rr
}

Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
NamespaceKubeSystem = "kube-system"
NamespaceIstioSystem = "istio-system"
IstioIngressGatewayLabelValue = "istio-ingressgateway"
NLBIstioIngressGatewayLabelValue = "nlb-istio-ingressgateway"
NLBIstioIngressGatewayLabelValue = "istio-ingressgateway-nlb"
Env = "env"
DefaultMtlsPort = 15443
DefaultServiceEntryPort = 80
Expand Down