Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Choreo][Intern project] Response caching feature] #3539

Draft
wants to merge 13 commits into
base: choreo
Choose a base branch
from
Draft
7 changes: 7 additions & 0 deletions adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ var defaultConfig = &Config{
ListenerPort: 9090,
SecuredListenerHost: "0.0.0.0",
SecuredListenerPort: 9095,
NewListenerHost: "0.0.0.0",
NewListenerPort: 9096,
ClusterTimeoutInSeconds: 20,
EnforcerResponseTimeoutInSeconds: 20,
MaximumResourcePathLengthInKB: -1,
Expand Down Expand Up @@ -160,6 +162,11 @@ var defaultConfig = &Config{
SSLCertSANHostname: "",
},
},
Varnish: varnish{
Enabled: true,
Host: "varnish",
Port: 80,
},
Enforcer: enforcer{
Management: management{
Username: "admin",
Expand Down
9 changes: 9 additions & 0 deletions adapter/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Config struct {
GlobalAdapter globalAdapter `toml:"globalAdapter"`
Analytics analytics `toml:"analytics"`
Tracing tracing
Varnish varnish
}

// Adapter related Configurations
Expand Down Expand Up @@ -112,6 +113,8 @@ type envoy struct {
ListenerPort uint32
SecuredListenerHost string
SecuredListenerPort uint32
NewListenerHost string
NewListenerPort uint32
ClusterTimeoutInSeconds time.Duration
EnforcerResponseTimeoutInSeconds time.Duration `default:"20"`
KeyStore keystore
Expand Down Expand Up @@ -148,6 +151,12 @@ type rateLimit struct {
SSLCertSANHostname string
}

type varnish struct{
Enabled bool
Host string
Port uint32
}

type xRateLimitHeaders struct {
Enabled bool
RFCVersion string
Expand Down
10 changes: 10 additions & 0 deletions adapter/internal/oasparser/config_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func GetGlobalClusters() ([]*clusterv3.Cluster, []*corev3.Address) {
}
}

if conf.Varnish.Enabled {
varnishCluster, varnishEP, errVarnish := envoy.CreateVarnishCluster()
if errVarnish == nil {
clusters = append(clusters, varnishCluster)
endpoints = append(endpoints, varnishEP...)
} else {
logger.LoggerOasparser.Fatalf("Failed to initialize Varnish cluster. Hence terminating the adapter. Error: %s", errVarnish)
}
}

if conf.Tracing.Enabled && conf.Tracing.Type != envoyconf.TracerTypeAzure {
logger.LoggerOasparser.Debugln("Creating global cluster - Tracing")
if c, e, err := envoyconf.CreateTracingCluster(conf); err == nil {
Expand Down
3 changes: 3 additions & 0 deletions adapter/internal/oasparser/envoyconf/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
tracingClusterName string = "wso2_cc_trace"
extAuthzHTTPCluster string = "ext_authz_http_cluster"
rateLimitClusterName string = "rate-limit"
varnishClusterName string = "varnish"
)

const (
Expand Down Expand Up @@ -58,6 +59,7 @@ const (
defaultRdsConfigName string = "default"
defaultHTTPListenerName string = "HTTPListener"
defaultHTTPSListenerName string = "HTTPSListener"
newHTTPListenerName string = "NewHTTPListener"
defaultAccessLogPath string = "/tmp/envoy.access.log"
defaultListenerSecretConfigName string = "DefaultListenerSecret"
)
Expand Down Expand Up @@ -105,6 +107,7 @@ const (
)
const (
httpsURLType string = "https"
httpURLType string = "http"
wssURLType string = "wss"
httpMethodHeader string = ":method"
)
Expand Down
10 changes: 10 additions & 0 deletions adapter/internal/oasparser/envoyconf/http_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ func getHTTPFilters() []*hcmv3.HttpFilter {
return httpFilters
}

// get router filter in a array
func getHTTPRouterFilters() []*hcmv3.HttpFilter {
router := getRouterHTTPFilter()

httpRouterFilters := []*hcmv3.HttpFilter{
router,
}
return httpRouterFilters
}

// getRouterHTTPFilter gets router http filter.
func getRouterHTTPFilter() *hcmv3.HttpFilter {

Expand Down
154 changes: 107 additions & 47 deletions adapter/internal/oasparser/envoyconf/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/wso2/product-microgateway/adapter/config"
logger "github.com/wso2/product-microgateway/adapter/internal/loggers"
config_access_logv3 "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
)

// CreateRoutesConfigForRds generates the default RouteConfiguration.
Expand Down Expand Up @@ -78,56 +79,14 @@ func CreateListenersWithRds() []*listenerv3.Listener {
func createListeners(conf *config.Config) []*listenerv3.Listener {
httpFilters := getHTTPFilters()
upgradeFilters := getUpgradeFilters()
router := getHTTPRouterFilters()
accessLogs := getAccessLogs()
var filters []*listenerv3.Filter
var customFilters []*listenerv3.Filter
var listeners []*listenerv3.Listener

manager := &hcmv3.HttpConnectionManager{
CodecType: hcmv3.HttpConnectionManager_AUTO,
StatPrefix: httpConManagerStartPrefix,
// WebSocket upgrades enabled from the HCM
UpgradeConfigs: []*hcmv3.HttpConnectionManager_UpgradeConfig{{
UpgradeType: "websocket",
Enabled: &wrappers.BoolValue{Value: true},
Filters: upgradeFilters,
}},
RouteSpecifier: &hcmv3.HttpConnectionManager_Rds{
Rds: &hcmv3.Rds{
RouteConfigName: defaultRdsConfigName,
ConfigSource: &corev3.ConfigSource{
ConfigSourceSpecifier: &corev3.ConfigSource_Ads{
Ads: &corev3.AggregatedConfigSource{},
},
ResourceApiVersion: corev3.ApiVersion_V3,
},
},
},
HttpFilters: httpFilters,
LocalReplyConfig: &hcmv3.LocalReplyConfig{
Mappers: getErrorResponseMappers(),
},
RequestTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.RequestTimeoutInSeconds * time.Second), // default disabled
RequestHeadersTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.RequestHeadersTimeoutInSeconds * time.Second), // default disabled
StreamIdleTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.StreamIdleTimeoutInSeconds * time.Second), // Default 5 mins
CommonHttpProtocolOptions: &corev3.HttpProtocolOptions{
IdleTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.IdleTimeoutInSeconds * time.Second), // Default 1 hr
},
}

if len(accessLogs) > 0 {
manager.AccessLog = accessLogs
}

if conf.Tracing.Enabled && conf.Tracing.Type != TracerTypeAzure {
if tracing, err := getTracing(conf); err == nil {
manager.Tracing = tracing
manager.GenerateRequestId = &wrappers.BoolValue{Value: conf.Tracing.Enabled}
} else {
logger.LoggerOasparser.Error("Failed to initialize tracing. Router tracing will be disabled. ", err)
conf.Tracing.Enabled = false
}
}

//creating the manager for exisitng listener
manager := createHTTPConnectionManager(httpFilters, upgradeFilters, accessLogs, conf)
pbst, err := anypb.New(manager)
if err != nil {
logger.LoggerOasparser.Fatal(err)
Expand All @@ -138,10 +97,25 @@ func createListeners(conf *config.Config) []*listenerv3.Listener {
TypedConfig: pbst,
},
}

// add filters
filters = append(filters, &connectionManagerFilterP)

// Creating the new manager for new listener
newManager := createHTTPConnectionManager(router, router, accessLogs, conf)
pbstNewManager, err := anypb.New(newManager)
if err != nil {
logger.LoggerOasparser.Fatal(err)
}
connectionManagerFilterNew := listenerv3.Filter{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listenerv3.Filter_TypedConfig{
TypedConfig: pbstNewManager,
},
}
// add new filters
customFilters = append(customFilters, &connectionManagerFilterNew)


if conf.Envoy.SecuredListenerPort > 0 {
listenerHostAddress := defaultListenerHostAddress
if len(conf.Envoy.SecuredListenerHost) > 0 {
Expand Down Expand Up @@ -228,6 +202,39 @@ func createListeners(conf *config.Config) []*listenerv3.Listener {
logger.LoggerOasparser.Info("No Non-securedListenerPort is included.")
}

// new listener
if conf.Envoy.NewListenerPort > 0 {
listenerHostAddress := defaultListenerHostAddress
if len(conf.Envoy.NewListenerHost) > 0 {
listenerHostAddress = conf.Envoy.NewListenerHost
}
listenerAddress := &corev3.Address_SocketAddress{
SocketAddress: &corev3.SocketAddress{
Protocol: corev3.SocketAddress_TCP,
Address: listenerHostAddress,
PortSpecifier: &corev3.SocketAddress_PortValue{
PortValue: conf.Envoy.NewListenerPort,
},
},
}

listener := listenerv3.Listener{
Name: newHTTPListenerName,
Address: &corev3.Address{
Address: listenerAddress,
},
FilterChains: []*listenerv3.FilterChain{{
Filters: customFilters,
},
},
}
listeners = append(listeners, &listener)
logger.LoggerOasparser.Infof("New Listener is added. %s : %d", listenerHostAddress, conf.Envoy.NewListenerPort)
} else {
logger.LoggerOasparser.Info("No NewListenerPort is included.")
}


if len(listeners) == 0 {
err := errors.New("No Listeners are configured as no port value is mentioned under securedListenerPort or ListenerPort")
logger.LoggerOasparser.Fatal(err)
Expand Down Expand Up @@ -319,3 +326,56 @@ func getTracing(conf *config.Config) (*hcmv3.HttpConnectionManager_Tracing, erro

return tracing, nil
}

// function to create http managers
func createHTTPConnectionManager(httpFilters []*hcmv3.HttpFilter, upgradeFilters []*hcmv3.HttpFilter, accessLogs []*config_access_logv3.AccessLog, conf *config.Config) *hcmv3.HttpConnectionManager {
manager := &hcmv3.HttpConnectionManager{
CodecType: hcmv3.HttpConnectionManager_AUTO,
StatPrefix: httpConManagerStartPrefix,
// WebSocket upgrades enabled from the HCM
UpgradeConfigs: []*hcmv3.HttpConnectionManager_UpgradeConfig{{
UpgradeType: "websocket",
Enabled: &wrappers.BoolValue{Value: true},
Filters: upgradeFilters,
}},
RouteSpecifier: &hcmv3.HttpConnectionManager_Rds{
Rds: &hcmv3.Rds{
RouteConfigName: defaultRdsConfigName,
ConfigSource: &corev3.ConfigSource{
ConfigSourceSpecifier: &corev3.ConfigSource_Ads{
Ads: &corev3.AggregatedConfigSource{},
},
ResourceApiVersion: corev3.ApiVersion_V3,
},
},
},
HttpFilters: httpFilters,
LocalReplyConfig: &hcmv3.LocalReplyConfig{
Mappers: getErrorResponseMappers(),
},
RequestTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.RequestTimeoutInSeconds * time.Second), // default disabled
RequestHeadersTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.RequestHeadersTimeoutInSeconds * time.Second), // default disabled
StreamIdleTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.StreamIdleTimeoutInSeconds * time.Second), // Default 5 mins
CommonHttpProtocolOptions: &corev3.HttpProtocolOptions{
IdleTimeout: ptypes.DurationProto(conf.Envoy.Connection.Timeouts.IdleTimeoutInSeconds * time.Second), // Default 1 hr
},

}


if len(accessLogs) > 0 {
manager.AccessLog = accessLogs
}

if conf.Tracing.Enabled && conf.Tracing.Type != TracerTypeAzure {
if tracing, err := getTracing(conf); err == nil {
manager.Tracing = tracing
manager.GenerateRequestId = &wrappers.BoolValue{Value: conf.Tracing.Enabled}
} else {
logger.LoggerOasparser.Error("Failed to initialize tracing. Router tracing will be disabled. ", err)
conf.Tracing.Enabled = false
}
}

return manager
}
2 changes: 1 addition & 1 deletion adapter/internal/oasparser/envoyconf/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCreateListenerWithRds(t *testing.T) {
// TODO: (Vajira) Add more test scenarios
listeners := CreateListenersWithRds()
assert.NotEmpty(t, listeners, "Listeners creation has been failed")
assert.Equal(t, 2, len(listeners), "Two listeners are not created.")
assert.Equal(t, 3, len(listeners), "Two listeners are not created.")

securedListener := listeners[0]
if securedListener.Validate() != nil {
Expand Down
23 changes: 23 additions & 0 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,29 @@ func CreateRateLimitCluster() (*clusterv3.Cluster, []*corev3.Address, error) {
return cluster, address, nil
}

// CreateVarnishCluster creates cluster relevant to the varnish
func CreateVarnishCluster() (*clusterv3.Cluster, []*corev3.Address, error) {

conf, _ := config.ReadConfigs()

varnishCluster := &model.EndpointCluster{
Endpoints: []model.Endpoint{
{
Host: conf.Varnish.Host,
URLType: httpURLType,
Port: conf.Varnish.Port,
},
},
}

cluster, address, varnishErr := processEndpoints(varnishClusterName, varnishCluster, nil, 20, "")
if varnishErr != nil {
return nil, nil, varnishErr
}

return cluster, address, nil
}

// CreateTracingCluster creates a cluster definition for router's tracing server.
func CreateTracingCluster(conf *config.Config) (*clusterv3.Cluster, []*corev3.Address, error) {
var epHost string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ResourceConfig {
private Map<String, List<String>> securitySchemas = new HashMap(); // security_schema_name -> scopes
private String tier = "Unlimited";
private boolean disableSecurity = false;
public boolean responseCache = false;
private Map<String, EndpointCluster> endpoints; // "PRODUCTION" OR "SANDBOX" -> endpoint cluster

/**
Expand Down Expand Up @@ -128,5 +129,10 @@ public void setEndpoints(Map<String, EndpointCluster> endpoints) {
this.endpoints = endpoints;
}

// check for the response caching enabled apis
public boolean isResponseCache() {
return responseCache;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class APIConstants {
public static final String GATEWAY_PUBLIC_CERTIFICATE_ALIAS = "gateway_certificate_alias";
public static final String WSO2_PUBLIC_CERTIFICATE_ALIAS = "wso2carbon";
public static final String HTTPS_PROTOCOL = "https";
public static final String HTTP_GET_METHOD = "GET";
public static final String HTTP_HEAD_METHOD = "HEAD";
public static final String SUPER_TENANT_DOMAIN_NAME = "carbon.super";
public static final String BANDWIDTH_TYPE = "bandwidthVolume";
public static final String AUTHORIZATION_HEADER_DEFAULT = "Authorization";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
public class AdapterConstants {
// The header which should be populated to set the upstream cluster
public static final String CLUSTER_HEADER = "x-wso2-cluster-header";
// The header which should be populated to set the upstream cluster when response caching is enabled
public static final String ACTUAL_CLUSTER_HEADER = "x-wso2-actual-cluster";
// The header which should be populated to set the actual request path when response caching is enabled
public static final String PATH_HEADER = "x-wso2-request-path";
// The header which should be populated to set the upstream cluster when response caching is enabled
public static final String ACTUAL_HOST_HEADER = "x-wso2-actual-host";
// The key which specifies the production cluster name inside the request context
public static final String PROD_CLUSTER_HEADER_KEY = "prodClusterName";
// The key which specifies the sandbox cluster name inside the request context
Expand Down
Loading
Loading