Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk fetching APIs from Control Plane on startup #3495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ var defaultConfig = &Config{
QueueSizePerPool: 1000,
PauseTimeAfterFailure: 5,
},
InitialFetch: initialFetch{
ChunkSize: 10000,
},
},
GlobalAdapter: globalAdapter{
Enabled: false,
Expand Down
5 changes: 5 additions & 0 deletions adapter/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ type controlPlane struct {
BrokerConnectionParameters brokerConnectionParameters
HTTPClient httpClient
RequestWorkerPool requestWorkerPool
InitialFetch initialFetch
}

type dynamicEnvironments struct {
Expand All @@ -498,6 +499,10 @@ type requestWorkerPool struct {
PauseTimeAfterFailure time.Duration
}

type initialFetch struct {
ChunkSize int
}

type globalAdapter struct {
Enabled bool
ServiceURL string
Expand Down
30 changes: 25 additions & 5 deletions adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
healthservice "github.com/wso2/product-microgateway/adapter/pkg/health/api/wso2/health/service"
sync "github.com/wso2/product-microgateway/adapter/pkg/synchronizer"
"github.com/wso2/product-microgateway/adapter/pkg/tlsutils"
"github.com/wso2/product-microgateway/adapter/pkg/utils"

"context"
"flag"
Expand Down Expand Up @@ -328,9 +329,31 @@ OUTER:
logger.LoggerMgw.Info("Bye!")
}

// fetch APIs from control plane during the server start up and push them
// fetchAPIsOnStartUp fetches APIs from control plane chunk by chunk during the server start up and push them
// to the router and enforcer components.
func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
if apiUUIDList == nil {
logger.LoggerMgw.Info("Fetching APIs at startup...")
fetchChunkedAPIsOnStartUp(conf, nil, common.XdsOptions{})
} else {
logger.LoggerMgw.Infof("Fetching APIs at startup with the received API UUID list (size: %d)...", len(apiUUIDList))

chunkedAPIUuidsList := utils.ChunkSlice(apiUUIDList, conf.ControlPlane.InitialFetch.ChunkSize)
for i, chunkedAPIUuids := range chunkedAPIUuidsList {
logger.LoggerMgw.Infof("Fetching chunked APIs... [%d/%d]", i+1, len(chunkedAPIUuidsList))
isNotFinalChunk := i != len(chunkedAPIUuidsList)-1
fetchChunkedAPIsOnStartUp(conf, chunkedAPIUuids, common.XdsOptions{SkipUpdatingXdsCache: isNotFinalChunk})
}
}

// All apis are fetched. Deploy the /ready route for the readiness and startup probes.
xds.DeployReadinessAPI(conf.ControlPlane.EnvironmentLabels)
logger.LoggerMgw.Info("Fetching APIs at startup is completed...")
}

// fetch APIs from control plane during the server start up and push them
// to the router and enforcer components.
func fetchChunkedAPIsOnStartUp(conf *config.Config, apiUUIDList []string, xdsOptions common.XdsOptions) {
// Populate data from config.
envs := conf.ControlPlane.EnvironmentLabels

Expand Down Expand Up @@ -361,7 +384,7 @@ func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
if data.Resp != nil {
// For successfull fetches, data.Resp would return a byte slice with API project(s)
logger.LoggerMgw.Debug("Pushing data to router and enforcer")
err := synchronizer.PushAPIProjects(data.Resp, envs)
err := synchronizer.PushAPIProjects(data.Resp, envs, xdsOptions)
if err != nil {
logger.LoggerMgw.Errorf("Error occurred while pushing API data: %v ", err)
}
Expand All @@ -382,9 +405,6 @@ func fetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
}
}
}
// All apis are fetched. Deploy the /ready route for the readiness and startup probes.
xds.DeployReadinessAPI(envs)
logger.LoggerMgw.Info("Fetching APIs at startup is completed...")
}

// FetchAPIUUIDsFromGlobalAdapter get the UUIDs of the APIs at the LA startup from GA
Expand Down
6 changes: 4 additions & 2 deletions adapter/internal/api/apis_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/wso2/product-microgateway/adapter/config"
apiModel "github.com/wso2/product-microgateway/adapter/internal/api/models"
"github.com/wso2/product-microgateway/adapter/internal/common"
xds "github.com/wso2/product-microgateway/adapter/internal/discovery/xds"
"github.com/wso2/product-microgateway/adapter/internal/loggers"
"github.com/wso2/product-microgateway/adapter/internal/notifier"
Expand Down Expand Up @@ -221,7 +222,7 @@ func validateAndUpdateXds(apiProject mgw.ProjectAPI, override *bool) (err error)

// TODO: (renuka) optimize to update cache only once when all internal memory maps are updated
for vhost, environments := range vhostToEnvsMap {
_, err = xds.UpdateAPI(vhost, apiProject, environments)
_, err = xds.UpdateAPI(vhost, apiProject, environments, common.XdsOptions{})
if err != nil {
return
}
Expand All @@ -235,6 +236,7 @@ func ApplyAPIProjectFromAPIM(
payload []byte,
vhostToEnvsMap map[string][]*synchronizer.GatewayLabel,
apiEnvs map[string]map[string]synchronizer.APIEnvProps,
xdsOptions common.XdsOptions,
) (deployedRevisionList []*notifier.DeployedAPIRevision, err error) {
apiProject, err := extractAPIProject(payload)
if err != nil {
Expand Down Expand Up @@ -280,7 +282,7 @@ func ApplyAPIProjectFromAPIM(
loggers.LoggerAPI.Debugf("Update all environments (%v) of API %v %v:%v with UUID \"%v\".",
environments, vhost, apiYaml.Name, apiYaml.Version, apiYaml.ID)
// first update the API for vhost
deployedRevision, err := xds.UpdateAPI(vhost, apiProject, environments)
deployedRevision, err := xds.UpdateAPI(vhost, apiProject, environments, xdsOptions)
if err != nil {
return deployedRevisionList, fmt.Errorf("%v:%v with UUID \"%v\"", apiYaml.Name, apiYaml.Version, apiYaml.ID)
}
Expand Down
23 changes: 23 additions & 0 deletions adapter/internal/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package common

// XdsOptions represents the options for xDS.
type XdsOptions struct {
SkipUpdatingXdsCache bool
}
21 changes: 15 additions & 6 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/wso2/product-microgateway/adapter/config"
apiModel "github.com/wso2/product-microgateway/adapter/internal/api/models"
"github.com/wso2/product-microgateway/adapter/internal/common"
logger "github.com/wso2/product-microgateway/adapter/internal/loggers"
"github.com/wso2/product-microgateway/adapter/internal/notifier"
oasParser "github.com/wso2/product-microgateway/adapter/internal/oasparser"
Expand Down Expand Up @@ -271,7 +272,9 @@ func DeployReadinessAPI(envs []string) {
}

// UpdateAPI updates the Xds Cache when OpenAPI Json content is provided
func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*synchronizer.GatewayLabel) (*notifier.DeployedAPIRevision, error) {
func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*synchronizer.GatewayLabel,
xdsOptions common.XdsOptions) (*notifier.DeployedAPIRevision, error) {

var mgwSwagger mgw.MgwSwagger
var deployedRevision *notifier.DeployedAPIRevision
var err error
Expand Down Expand Up @@ -520,12 +523,18 @@ func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*
}

// TODO: (VirajSalaka) Fault tolerance mechanism implementation
revisionStatus := updateXdsCacheOnAPIAdd(oldLabels, routerLabels)
if revisionStatus {
// send updated revision to control plane
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments,
vHost)
// Skipping the xDS cache update is fine as Choreo uses one gateway label for single Choreo Connect deployment.
if !xdsOptions.SkipUpdatingXdsCache {
logger.LoggerXds.Debugf("Updating the XDS cache for the API %v:%v", apiYaml.Name, apiYaml.Version)
updateXdsCacheOnAPIAdd(oldLabels, routerLabels)
} else {
logger.LoggerXds.Debugf("Skipping the XDS cache update for the API %v:%v", apiYaml.Name, apiYaml.Version)
}

// Send updated revision without checking the error state of xDS cache consistent state by assuming it should be consistent for the resources
// created by the Adapter.
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments,
vHost)
if svcdiscovery.IsServiceDiscoveryEnabled {
startConsulServiceDiscovery(organizationID) //consul service discovery starting point
}
Expand Down
15 changes: 10 additions & 5 deletions adapter/internal/synchronizer/apis_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ func init() {
// byte slice. This method ensures to update the enforcer and router using entries inside the
// downloaded apis.zip one by one.
// If the updating envoy or enforcer fails, this method returns an error, if not error would be nil.
func PushAPIProjects(payload []byte, environments []string) error {
func PushAPIProjects(payload []byte, environments []string, xdsOptions common.XdsOptions) error {
var deploymentList []*notifier.DeployedAPIRevision
// Reading the root zip
zipReader, err := zip.NewReader(bytes.NewReader(payload), int64(len(payload)))
if err != nil {
logger.LoggerSync.Errorf("Error occured while unzipping the apictl project. Error: %v", err.Error())
return err
}
logger.LoggerSync.Infof("Start Deploying %d API/s...", len(zipReader.File)-2)
apisCount := len(zipReader.File) - 2
logger.LoggerSync.Infof("Start Deploying %d API/s...", apisCount)

// apiFiles represents zipped API files fetched from API Manager
apiFiles := make(map[string]*zip.File, len(zipReader.File)-1)
apiFiles := make(map[string]*zip.File, apisCount)
// Read deployments from deployment.json file
deploymentDescriptor, envProps, err := sync.ReadRootFiles(zipReader)
if err != nil {
Expand Down Expand Up @@ -112,13 +113,17 @@ func PushAPIProjects(payload []byte, environments []string) error {
// Pass the byte slice for the XDS APIs to push it to the enforcer and router
// TODO: (renuka) optimize applying API project, update maps one by one and apply xds once
var deployedRevisionList []*notifier.DeployedAPIRevision
deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps)
deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps, xdsOptions)
if err != nil {
logger.LoggerSync.Errorf("Error occurred while applying project %v", err)
} else if deployedRevisionList != nil {
deploymentList = append(deploymentList, MergeDeployedRevisionList(deployedRevisionList)...)
}
}

// TODO: (renuka) notify the revision deployment to the control plane once all chunks are deployed.
// This is not fixed as notify the control plane chunk by chunk (even though the chunk is not really applied to the Enforcer and Router) is not a drastic issue.
// This path is only happening when Adapter is restarting and at that time the deployed time is already updated in the control plane.
notifier.SendRevisionUpdate(deploymentList)
logger.LoggerSync.Infof("Successfully deployed %d API/s", len(deploymentList))
// Error nil for successful execution
Expand Down Expand Up @@ -227,7 +232,7 @@ func FetchAPIsFromControlPlane(updatedAPIID string, updatedEnvs []string, envToD
// For successfull fetches, data.Resp would return a byte slice with API project(s)
logger.LoggerSync.Infof("Pushing data to router and enforcer for the API %q", updatedAPIID)
receivedArtifact = true
err := PushAPIProjects(data.Resp, finalEnvs)
err := PushAPIProjects(data.Resp, finalEnvs, common.XdsOptions{})
if err != nil {
logger.LoggerSync.Errorf("Error occurred while pushing API data for the API %q: %v ", updatedAPIID, err)
}
Expand Down
3 changes: 0 additions & 3 deletions adapter/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func GetAPIs(c chan synchronizer.SyncAPIResponse, id *string, envs []string, end
logger.LoggerAdapter.Debugf("Environments label present: %v", envs)
go synchronizer.FetchAPIs(id, envs, c, endpoint, sendType, apiUUIDList, queryParamMap)
} else {
// If the environments are not give, fetch the APIs from default envrionment
logger.LoggerAdapter.Debug("Environments label NOT present. Hence adding \"default\"")
envs = append(envs, "default")
go synchronizer.FetchAPIs(id, nil, c, endpoint, sendType, apiUUIDList, queryParamMap)
}
}
32 changes: 32 additions & 0 deletions adapter/pkg/utils/slice_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package utils contains the common utility functions
package utils

// ChunkSlice splits the given slice into chunks of the given size
func ChunkSlice(slice []string, chunkSize int) [][]string {
var chunks [][]string
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}
chunks = append(chunks, slice[i:end])
}
return chunks
}
40 changes: 40 additions & 0 deletions adapter/pkg/utils/slice_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package utils

import "testing"

func TestChunkSlice(t *testing.T) {
slice := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
chunkSize := 3
expectedChunks := [][]string{{"a", "b", "c"}, {"d", "e", "f"}, {"g", "h", "i"}, {"j"}}
chunks := ChunkSlice(slice, chunkSize)
if len(chunks) != len(expectedChunks) {
t.Errorf("Expected chunks length: %d, but got: %d", len(expectedChunks), len(chunks))
}
for i, chunk := range chunks {
if len(chunk) != len(expectedChunks[i]) {
t.Errorf("Expected chunk length: %d, but got: %d", len(expectedChunks[i]), len(chunk))
}
for j, val := range chunk {
if val != expectedChunks[i][j] {
t.Errorf("Expected chunk value: %s, but got: %s", expectedChunks[i][j], val)
}
}
}
}
4 changes: 4 additions & 0 deletions resources/conf/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ enabled = true
# HTTP client configuration.
[controlPlane.hTTPClient]
requestTimeOut = 30
# Initial fetch configurations
[controlPlane.initialFetch]
# Number of APIs to be fetched in a single request to the Control Plane
chunkSize = 10000

# Global Adapter related configurations
[globalAdapter]
Expand Down
Loading