From 4a350c5169fc09dbc6affc66325ef30e8f2c81ae Mon Sep 17 00:00:00 2001 From: JimDevil <709192853@qq.com> Date: Sun, 19 Jan 2025 15:27:42 +0800 Subject: [PATCH] feat: Implementation aggregator apiserver and resourcecache controller manager 1. aggregator apiserver install resourcecache and proxy api group 2. controller for reconcile resourcecache and handle proxy request Signed-off-by: chengjin --- .../proxy/app/clusterlink-proxy.go | 57 ++-- cmd/clusterlink/proxy/app/options/options.go | 206 ++++++------ deploy/clusterlink-proxy.yml | 57 ---- pkg/apis/proxy/install/install.go | 5 +- pkg/apis/proxy/scheme/register.go | 4 +- pkg/clusterlink/proxy/apiserver.go | 50 +-- .../proxy/controller/controller.go | 292 ++++++++++++++++++ .../proxy/controller/controller_test.go | 91 ++++++ .../proxy/delegate/apiserver/apiserver.go | 68 ++++ pkg/clusterlink/proxy/delegate/cache/cache.go | 135 ++++++++ pkg/clusterlink/proxy/delegate/interface.go | 78 +++++ pkg/clusterlink/proxy/storage.go | 239 -------------- pkg/clusterlink/proxy/store/cluster_cache.go | 86 +++--- pkg/clusterlink/proxy/store/resource_cache.go | 25 +- pkg/clusterlink/proxy/store/store.go | 121 +++++++- .../clusterlink/storage/proxy_test.go | 7 +- pkg/sharedcli/profileflag/profileflag.go | 101 ++++++ pkg/utils/constants.go | 8 + pkg/utils/utils.go | 48 +++ pkg/utils/utils_test.go | 41 +++ 20 files changed, 1196 insertions(+), 523 deletions(-) delete mode 100644 deploy/clusterlink-proxy.yml create mode 100644 pkg/clusterlink/proxy/controller/controller.go create mode 100644 pkg/clusterlink/proxy/controller/controller_test.go create mode 100644 pkg/clusterlink/proxy/delegate/apiserver/apiserver.go create mode 100644 pkg/clusterlink/proxy/delegate/cache/cache.go create mode 100644 pkg/clusterlink/proxy/delegate/interface.go delete mode 100644 pkg/clusterlink/proxy/storage.go create mode 100644 pkg/sharedcli/profileflag/profileflag.go diff --git a/cmd/clusterlink/proxy/app/clusterlink-proxy.go b/cmd/clusterlink/proxy/app/clusterlink-proxy.go index 85433b9c6..4684c2b73 100644 --- a/cmd/clusterlink/proxy/app/clusterlink-proxy.go +++ b/cmd/clusterlink/proxy/app/clusterlink-proxy.go @@ -5,11 +5,13 @@ import ( "fmt" "github.com/spf13/cobra" + genericapiserver "k8s.io/apiserver/pkg/server" cliflag "k8s.io/component-base/cli/flag" - "k8s.io/component-base/term" - "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/cmd/clusterlink/proxy/app/options" + "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" + profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag" + "github.com/kosmos.io/kosmos/pkg/utils" ) // NewClusterLinkProxyCommand creates a *cobra.Command object with default parameters @@ -17,15 +19,12 @@ func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command { opts := options.NewOptions() cmd := &cobra.Command{ - Use: "proxy", - Long: `The proxy starts a apiserver for agent access the backend proxy`, - RunE: func(cmd *cobra.Command, args []string) error { - // validate options - /* - if errs := opts.Validate(); len(errs) != 0 { - return errs.ToAggregate() - } - */ + Use: utils.KosmosClusrerLinkRroxyComponentName, + Long: `starts a server for agent kube-apiserver`, + RunE: func(_ *cobra.Command, _ []string) error { + if err := opts.Validate(); err != nil { + return err + } return run(ctx, opts) }, Args: func(cmd *cobra.Command, args []string) error { @@ -37,24 +36,26 @@ func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command { return nil }, } - namedFlagSets := opts.Flags() - fs := cmd.Flags() - for _, f := range namedFlagSets.FlagSets { - fs.AddFlagSet(f) - } + flags := cmd.Flags() - cols, _, err := term.TerminalSize(cmd.OutOrStdout()) - if err != nil { - klog.Warning("term.TerminalSize err: %v", err) - } else { - cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols) - } + fss := cliflag.NamedFlagSets{} + genericFlagSet := fss.FlagSet("generic") + opts.AddFlags(genericFlagSet) + + logsFlagSet := fss.FlagSet("logs") + klogflag.Add(logsFlagSet) + + flags.AddFlagSet(genericFlagSet) + flags.AddFlagSet(logsFlagSet) return cmd } func run(ctx context.Context, opts *options.Options) error { + // pprof + profileflag.ListenAndServe(opts.ProfileOpts) + config, err := opts.Config() if err != nil { return err @@ -65,5 +66,17 @@ func run(ctx context.Context, opts *options.Options) error { return err } + server.GenericAPIServer.AddPostStartHookOrDie("start-proxy-controller", func(context genericapiserver.PostStartHookContext) error { + go func() { + config.ExtraConfig.ProxyController.Run(context.StopCh, 1) + }() + return nil + }) + + server.GenericAPIServer.AddPostStartHookOrDie("start-apiserver-informer", func(context genericapiserver.PostStartHookContext) error { + config.ExtraConfig.KosmosInformerFactory.Start(context.StopCh) + return nil + }) + return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) } diff --git a/cmd/clusterlink/proxy/app/options/options.go b/cmd/clusterlink/proxy/app/options/options.go index 97489f503..4bbae9ecb 100644 --- a/cmd/clusterlink/proxy/app/options/options.go +++ b/cmd/clusterlink/proxy/app/options/options.go @@ -2,118 +2,81 @@ package options import ( "fmt" + "log" "net" "net/http" "strings" + "github.com/spf13/pflag" utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + "k8s.io/apiserver/pkg/endpoints/openapi" genericrequest "k8s.io/apiserver/pkg/endpoints/request" - genericapiserver "k8s.io/apiserver/pkg/server" + genericserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" - "k8s.io/apiserver/pkg/util/feature" - cliflag "k8s.io/component-base/cli/flag" - "k8s.io/component-base/featuregate" - "k8s.io/component-base/logs" - logsapi "k8s.io/component-base/logs/api/v1" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme" + proxyScheme "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme" "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy" + proxyctl "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller" + kosmosclientset "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" + generatedopenapi "github.com/kosmos.io/kosmos/pkg/generated/openapi" + profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag" "github.com/kosmos.io/kosmos/pkg/utils" ) -// Options contains command line parameters for clusterlink-proxy type Options struct { - MaxRequestsInFlight int - MaxMutatingRequestsInFlight int - utils.KubernetesOptions - - Logs *logs.Options - SecureServing *genericoptions.SecureServingOptionsWithLoopback - Authentication *genericoptions.DelegatingAuthenticationOptions - Authorization *genericoptions.DelegatingAuthorizationOptions - Audit *genericoptions.AuditOptions - Features *genericoptions.FeatureOptions - CoreAPI *genericoptions.CoreAPIOptions - FeatureGate featuregate.FeatureGate - Admission *genericoptions.AdmissionOptions + // RecommendedOptions *genericoptions.RecommendedOptions + GenericServerRunOptions *genericoptions.ServerRunOptions + SecureServing *genericoptions.SecureServingOptionsWithLoopback + Authentication *genericoptions.DelegatingAuthenticationOptions + Authorization *genericoptions.DelegatingAuthorizationOptions + Audit *genericoptions.AuditOptions + Features *genericoptions.FeatureOptions + CoreAPI *genericoptions.CoreAPIOptions + ServerRunOptions *genericoptions.ServerRunOptions + + ProfileOpts profileflag.Options } -// nolint -func NewOptions() *Options { - sso := genericoptions.NewSecureServingOptions() - - // We are composing recommended options for an aggregated api-server, - // whose client is typically a proxy multiplexing many operations --- - // notably including long-running ones --- into one HTTP/2 connection - // into this server. So allow many concurrent operations. - sso.HTTP2MaxStreamsPerConnection = 1000 - - return &Options{ - MaxRequestsInFlight: 0, - MaxMutatingRequestsInFlight: 0, - - Logs: logs.NewOptions(), - SecureServing: sso.WithLoopback(), - Authentication: genericoptions.NewDelegatingAuthenticationOptions(), - Authorization: genericoptions.NewDelegatingAuthorizationOptions(), - Audit: genericoptions.NewAuditOptions(), - Features: genericoptions.NewFeatureOptions(), - CoreAPI: genericoptions.NewCoreAPIOptions(), - FeatureGate: feature.DefaultFeatureGate, - Admission: genericoptions.NewAdmissionOptions(), - } +func (o *Options) AddFlags(flags *pflag.FlagSet) { + o.SecureServing.AddFlags(flags) + o.Authentication.AddFlags(flags) + o.Authorization.AddFlags(flags) + o.Audit.AddFlags(flags) + o.Features.AddFlags(flags) + o.CoreAPI.AddFlags(flags) + o.ServerRunOptions.AddUniversalFlags(flags) + o.ProfileOpts.AddFlags(flags) } // nolint -func (o *Options) Validate() error { - errors := []error{} - errors = append(errors, o.validateGenericOptions()...) - return utilerrors.NewAggregate(errors) -} - -func (o *Options) validateGenericOptions() []error { - errors := []error{} - if o.MaxRequestsInFlight < 0 { - errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value")) - } - if o.MaxMutatingRequestsInFlight < 0 { - errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value")) +func NewOptions() *Options { + o := &Options{ + GenericServerRunOptions: genericoptions.NewServerRunOptions(), + SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(), + Authentication: genericoptions.NewDelegatingAuthenticationOptions(), + Authorization: genericoptions.NewDelegatingAuthorizationOptions(), + Audit: genericoptions.NewAuditOptions(), + Features: genericoptions.NewFeatureOptions(), + CoreAPI: genericoptions.NewCoreAPIOptions(), + ServerRunOptions: genericoptions.NewServerRunOptions(), } - - errors = append(errors, o.CoreAPI.Validate()...) - errors = append(errors, o.SecureServing.Validate()...) - errors = append(errors, o.Authentication.Validate()...) - errors = append(errors, o.Authorization.Validate()...) - errors = append(errors, o.Audit.Validate()...) - errors = append(errors, o.Features.Validate()...) - return errors + return o } // nolint -func (o *Options) Flags() cliflag.NamedFlagSets { - var fss cliflag.NamedFlagSets - - genericfs := fss.FlagSet("generic") - genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+ - "Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.") - genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+ - "this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.") - - globalcfs := fss.FlagSet("global") - globalcfs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.") - globalcfs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.") - o.CoreAPI.AddFlags(globalcfs) - o.SecureServing.AddFlags(fss.FlagSet("secure serving")) - o.Authentication.AddFlags(fss.FlagSet("authentication")) - o.Authorization.AddFlags(fss.FlagSet("authorization")) - o.Audit.AddFlags(fss.FlagSet("auditing")) - o.Features.AddFlags(fss.FlagSet("features")) - logsapi.AddFlags(o.Logs, fss.FlagSet("logs")) - - // o.Admission.AddFlags(fss.FlagSet("admission")) - // o.Traces.AddFlags(fss.FlagSet("traces")) - - return fss +func (o *Options) Validate() error { + errs := []error{} + errs = append(errs, o.SecureServing.Validate()...) + errs = append(errs, o.Authentication.Validate()...) + errs = append(errs, o.Authorization.Validate()...) + errs = append(errs, o.Audit.Validate()...) + errs = append(errs, o.Features.Validate()...) + return utilerrors.NewAggregate(errs) } // nolint @@ -126,52 +89,79 @@ func (o *Options) Config() (*proxy.Config, error) { return nil, fmt.Errorf("error create self-signed certificates: %v", err) } - // remove NamespaceLifecycle admission plugin explicitly - // current admission plugins: mutatingwebhook, validatingwebhook - o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName) + // o.Admission.DisablePlugins = append(o.RecommendedOptions.Admission.DisablePlugins, lifecycle.PluginName) - genericConfig := genericapiserver.NewRecommendedConfig(proxy.Codecs) - // genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme)) - // genericConfig.OpenAPIConfig.Info.Title = openAPITitle - // genericConfig.OpenAPIConfig.Info.Version= openAPIVersion + genericConfig := genericserver.NewRecommendedConfig(proxyScheme.Codecs) + genericConfig.OpenAPIConfig = genericserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(scheme.Scheme)) + genericConfig.OpenAPIConfig.Info.Title = utils.KosmosClusrerLinkRroxyComponentName + genericConfig.OpenAPIConfig.Info.Version = utils.ClusterLinkOpenAPIVersion // support watch to LongRunningFunc genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool { return strings.Contains(r.RequestURI, "watch") } - if err := o.genericOptionsApplyTo(genericConfig); err != nil { + if err := o.ApplyTo(genericConfig); err != nil { + return nil, err + } + + restMapper, err := apiutil.NewDynamicRESTMapper(genericConfig.ClientConfig) + if err != nil { + klog.Errorf("Failed to create REST mapper: %v", err) + return nil, err + } + kosmosClient := kosmosclientset.NewForConfigOrDie(genericConfig.ClientConfig) + kosmosInformerFactory := informerfactory.NewSharedInformerFactory(kosmosClient, 0) + + dynamicClient, err := dynamic.NewForConfig(genericConfig.ClientConfig) + if err != nil { + log.Fatal(err) + } + + proxyCtl, err := proxyctl.NewResourceCacheController(proxyctl.NewControllerOption{ + RestConfig: genericConfig.ClientConfig, + RestMapper: restMapper, + KosmosFactory: kosmosInformerFactory, + DynamicClient: dynamicClient, + }) + if err != nil { return nil, err } return &proxy.Config{ GenericConfig: genericConfig, + ExtraConfig: proxy.ExtraConfig{ + ProxyController: proxyCtl, + KosmosInformerFactory: kosmosInformerFactory, + }, }, nil } -func (o *Options) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error { - config.MaxRequestsInFlight = o.MaxRequestsInFlight - config.MaxMutatingRequestsInFlight = o.MaxMutatingRequestsInFlight - - if err := o.SecureServing.ApplyTo(&config.SecureServing, &config.LoopbackClientConfig); err != nil { +func (o *Options) ApplyTo(config *genericserver.RecommendedConfig) error { + if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { return err } - if err := o.Authentication.ApplyTo(&config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil { + if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil { return err } - if err := o.Authorization.ApplyTo(&config.Authorization); err != nil { + if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil { return err } if err := o.Audit.ApplyTo(&config.Config); err != nil { return err } + if err := o.Features.ApplyTo(&config.Config); err != nil { return err } if err := o.CoreAPI.ApplyTo(config); err != nil { return err } - - utils.SetQPSBurst(config.ClientConfig, o.KubernetesOptions) - return o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate) + if err := o.ServerRunOptions.ApplyTo(&config.Config); err != nil { + return err + } + if err := o.Features.ApplyTo(&config.Config); err != nil { + return err + } + return nil } diff --git a/deploy/clusterlink-proxy.yml b/deploy/clusterlink-proxy.yml deleted file mode 100644 index 453a293b0..000000000 --- a/deploy/clusterlink-proxy.yml +++ /dev/null @@ -1,57 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: clusterlink-proxy-service - namespace: kosmos-system -spec: - selector: - app: clusterlink-proxy - ports: - - protocol: TCP - port: 443 - targetPort: 443 - nodePort: 32443 - type: ClusterIP ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: clusterlink-proxy - namespace: kosmos-system - labels: - app: clusterlink-proxy -spec: - replicas: 1 - selector: - matchLabels: - app: clusterlink-proxy - template: - metadata: - labels: - app: clusterlink-proxy - spec: - volumes: - - name: proxy-config - configMap: - defaultMode: 420 - name: clusterlink-agent-proxy - containers: - - name: manager - image: ghcr.io/kosmos-io/clusterlink-proxy:__VERSION__ - imagePullPolicy: IfNotPresent - command: - - clusterlink-proxy - - --kubeconfig=/etc/clusterlink/kubeconfig - - --authentication-kubeconfig=/etc/clusterlink/kubeconfig - - --authorization-kubeconfig=/etc/clusterlink/kubeconfig - resources: - limits: - memory: 500Mi - cpu: 500m - requests: - cpu: 500m - memory: 500Mi - volumeMounts: - - mountPath: /etc/clusterlink - name: proxy-config - readOnly: true diff --git a/pkg/apis/proxy/install/install.go b/pkg/apis/proxy/install/install.go index c79c98a85..f042c0a9a 100644 --- a/pkg/apis/proxy/install/install.go +++ b/pkg/apis/proxy/install/install.go @@ -17,10 +17,11 @@ limitations under the License. package install import ( - "github.com/kosmos.io/kosmos/pkg/apis/proxy" - "github.com/kosmos.io/kosmos/pkg/apis/proxy/v1alpha1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/kosmos.io/kosmos/pkg/apis/proxy" + "github.com/kosmos.io/kosmos/pkg/apis/proxy/v1alpha1" ) // Install registers the API group and adds types to a scheme. diff --git a/pkg/apis/proxy/scheme/register.go b/pkg/apis/proxy/scheme/register.go index 81ddc257b..028d7b684 100644 --- a/pkg/apis/proxy/scheme/register.go +++ b/pkg/apis/proxy/scheme/register.go @@ -17,13 +17,14 @@ limitations under the License. package scheme import ( - "github.com/kosmos.io/kosmos/pkg/apis/proxy/install" "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/kosmos.io/kosmos/pkg/apis/proxy/install" ) var ( @@ -40,5 +41,4 @@ func init() { install.Install(Scheme) utilruntime.Must(internalversion.AddToScheme(Scheme)) metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) - } diff --git a/pkg/clusterlink/proxy/apiserver.go b/pkg/clusterlink/proxy/apiserver.go index 816bce23e..c46fff3b8 100644 --- a/pkg/clusterlink/proxy/apiserver.go +++ b/pkg/clusterlink/proxy/apiserver.go @@ -1,32 +1,27 @@ package proxy import ( - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/client-go/dynamic" clientrest "k8s.io/client-go/rest" - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/scheme" -) - -var ( - // Scheme defines methods for serializing and deserializing API objects. - Scheme = scheme.NewSchema() - // Codecs provides methods for retrieving codecs and serializers for specific - // versions and content types. - Codecs = serializer.NewCodecFactory(Scheme) - - // ParameterCodec handles versioning of objects that are converted to query parameters. - ParameterCodec = runtime.NewParameterCodec(Scheme) + proxyScheme "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme" + "github.com/kosmos.io/kosmos/pkg/apis/proxy/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller" + informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" + clusterLinkStorage "github.com/kosmos.io/kosmos/pkg/registry/clusterlink/storage" ) // Config defines the config for the APIServer. type Config struct { GenericConfig *genericapiserver.RecommendedConfig + ExtraConfig ExtraConfig +} + +type ExtraConfig struct { + ProxyController *controller.ResourceCacheController + KosmosInformerFactory informerfactory.SharedInformerFactory } // APIServer defines the api server @@ -36,6 +31,7 @@ type APIServer struct { type completedConfig struct { GenericConfig genericapiserver.CompletedConfig + ExtraConfig *ExtraConfig ClientConfig *clientrest.Config } @@ -47,8 +43,8 @@ type CompletedConfig struct { // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. func (cfg *Config) Complete() CompletedConfig { c := completedConfig{ - cfg.GenericConfig.Complete(), - cfg.GenericConfig.ClientConfig, + GenericConfig: cfg.GenericConfig.Complete(), + ExtraConfig: &cfg.ExtraConfig, } c.GenericConfig.Version = &version.Info{ @@ -60,7 +56,7 @@ func (cfg *Config) Complete() CompletedConfig { } func (c completedConfig) New() (*APIServer, error) { - genericServer, err := c.GenericConfig.New("clusterlink-proxy", genericapiserver.NewEmptyDelegate()) + genericServer, err := c.GenericConfig.New("clusterlink-proxy-apiserver", genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } @@ -69,14 +65,18 @@ func (c completedConfig) New() (*APIServer, error) { GenericAPIServer: genericServer, } + apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo( + v1alpha1.GroupName, + proxyScheme.Scheme, + proxyScheme.ParameterCodec, + proxyScheme.Codecs, + ) + v1alpha1storage := map[string]rest.Storage{} - apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v1alpha1.GroupName, Scheme, ParameterCodec, Codecs) - client, err := dynamic.NewForConfig(c.ClientConfig) - if err != nil { - return nil, err - } - v1alpha1storage["proxying"] = NewREST(c.ClientConfig, client) + + v1alpha1storage["proxying"] = clusterLinkStorage.NewProxyREST(c.ExtraConfig.ProxyController) apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage + if err = server.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } diff --git a/pkg/clusterlink/proxy/controller/controller.go b/pkg/clusterlink/proxy/controller/controller.go new file mode 100644 index 000000000..dc3d3e005 --- /dev/null +++ b/pkg/clusterlink/proxy/controller/controller.go @@ -0,0 +1,292 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/metrics" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes/scheme" + clientrest "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate" + apiserverdelegate "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate/apiserver" + cachedelegate "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate/cache" + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/store" + informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" + lister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" +) + +const ( + maxRetries = 15 + minRequestTimeout = 15 +) + +type ResourceCacheController struct { + restMapper meta.RESTMapper + negotiatedSerializer runtime.NegotiatedSerializer + + cacheSynced cache.InformerSynced + resourceCacheLister lister.ResourceCacheLister + queue workqueue.RateLimitingInterface + enqueueResourceCache func(obj *v1alpha1.ResourceCache) + syncHandler func(key string) error + store store.Store + delegate delegate.Proxy +} + +type NewControllerOption struct { + RestConfig *clientrest.Config + DynamicClient dynamic.Interface + RestMapper meta.RESTMapper + KosmosFactory informerfactory.SharedInformerFactory +} + +func NewResourceCacheController(option NewControllerOption) (*ResourceCacheController, error) { + store := store.NewClusterCache(option.DynamicClient, option.RestMapper) + + rc := &ResourceCacheController{ + restMapper: option.RestMapper, + negotiatedSerializer: scheme.Codecs.WithoutConversion(), + store: store, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourceCache"), + cacheSynced: option.KosmosFactory.Kosmos().V1alpha1().ResourceCaches().Informer().HasSynced, + resourceCacheLister: option.KosmosFactory.Kosmos().V1alpha1().ResourceCaches().Lister(), + } + + resourceEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + r := obj.(*v1alpha1.ResourceCache) + klog.V(4).InfoS("Adding ResourceCache", "resourceCache", klog.KObj(r)) + rc.eventHandlerFunc(r) + }, + UpdateFunc: func(old, cur interface{}) { + oldR := old.(*v1alpha1.ResourceCache) + klog.V(4).InfoS("Updating ResourceCache", "resourceCache", klog.KObj(oldR)) + curR := cur.(*v1alpha1.ResourceCache) + rc.eventHandlerFunc(curR) + }, + DeleteFunc: func(obj interface{}) { + r := obj.(*v1alpha1.ResourceCache) + klog.V(4).InfoS("Deleting ResourceCache", "resourceCache", klog.KObj(r)) + rc.eventHandlerFunc(r) + }, + } + + _, err := option.KosmosFactory.Kosmos().V1alpha1().ResourceCaches().Informer().AddEventHandler(resourceEventHandler) + if err != nil { + klog.Errorf("Failed to add handler for Clusters: %v", err) + return nil, err + } + + // set delegate for proxy request + delegates, err := newDelegates(option, store) + if err != nil { + return nil, err + } + rc.delegate = delegate.NewDelegateChain(delegates) + + rc.enqueueResourceCache = rc.enqueue + rc.syncHandler = rc.syncResourceCache + return rc, nil +} + +// newDelegates the delegates for proxy request: cache->apiserver +func newDelegates(option NewControllerOption, store store.Store) ([]delegate.Delegate, error) { + delegateDependency := delegate.Dependency{ + RestConfig: option.RestConfig, + RestMapper: option.RestMapper, + Store: store, + MinRequestTimeout: minRequestTimeout * time.Second, + } + allDelegates := make([]delegate.Delegate, 0, 2) + allDelegates = append(allDelegates, cachedelegate.New(delegateDependency)) + apiserverdelegate, err := apiserverdelegate.New(delegateDependency) + if err != nil { + return allDelegates, err + } + allDelegates = append(allDelegates, apiserverdelegate) + return allDelegates, nil +} + +func (rc *ResourceCacheController) eventHandlerFunc(obj *v1alpha1.ResourceCache) { + rc.enqueueResourceCache(obj) +} + +func (rc *ResourceCacheController) enqueue(obj *v1alpha1.ResourceCache) { + name := obj.GetObjectMeta().GetName() + rc.queue.Add(name) +} + +// syncResourceCache will sync the resourceCache CR +// First list all resources, and then compare the previous cache to find out which are deleted +// and which are newly added +func (rc *ResourceCacheController) syncResourceCache(_ string) error { + // list all resourceCache CR + resourcesCaches, err := rc.resourceCacheLister.List(labels.Everything()) + if err != nil { + return err + } + // Define a map deduplication gvr + cachedResources := make(map[schema.GroupVersionResource]*utils.MultiNamespace) + + for _, resourceCache := range resourcesCaches { + for _, selector := range resourceCache.Spec.ResourceCacheSelectors { + gvr, err := rc.getGroupVersionResource(rc.restMapper, schema.FromAPIVersionAndKind(selector.APIVersion, selector.Kind)) + if err != nil { + klog.Errorf("Failed to get gvr: %v", err) + continue + } + nsSelector, ok := cachedResources[gvr] + if !ok { + nsSelector = utils.NewMultiNamespace() + cachedResources[gvr] = nsSelector + } + nsSelector.Add(selector.Namespace...) + cachedResources[gvr] = nsSelector + } + } + + return rc.store.UpdateCache(cachedResources) +} + +func (rc *ResourceCacheController) getGroupVersionResource(restMapper meta.RESTMapper, gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) { + restMapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + return restMapping.Resource, nil +} + +func (rc *ResourceCacheController) Run(stopCh <-chan struct{}, workers int) { + defer utilruntime.HandleCrash() + defer rc.queue.ShutDown() + + klog.InfoS("Starting controller", "controller", "resourceCache") + defer klog.InfoS("Shutting down controller", "controller", "resourceCache") + + klog.Info("Waiting for caches to sync for resourceCache controller") + if !cache.WaitForCacheSync(stopCh, rc.cacheSynced) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches for resourceCache controller")) + return + } + klog.Infof("Caches are synced for resourceCachecontroller") + + for i := 0; i < workers; i++ { + go wait.Until(rc.worker, time.Second, stopCh) + } + <-stopCh +} + +func (rc ResourceCacheController) worker() { + for rc.processNextWorkItem() { + } +} + +func (rc *ResourceCacheController) processNextWorkItem() bool { + key, quit := rc.queue.Get() + if quit { + return false + } + defer rc.queue.Done(key) + + err := rc.syncHandler(key.(string)) + rc.handleErr(err, key) + + return true +} + +func (rc *ResourceCacheController) handleErr(err error, key interface{}) { + if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { + rc.queue.Forget(key) + return + } + + ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) + if keyErr != nil { + klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) + } + + if rc.queue.NumRequeues(key) < maxRetries { + klog.V(2).InfoS("Error syncing resourceCache", "resourceCache", klog.KRef(ns, name), "err", err) + rc.queue.AddRateLimited(key) + return + } + + utilruntime.HandleError(err) + klog.V(2).InfoS("Dropping resourceCache out of the queue", "resourceCache", klog.KRef(ns, name), "err", err) + rc.queue.Forget(key) +} + +// Connect handler for proxy request use delegate chain +func (rc *ResourceCacheController) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + newReq := req.Clone(req.Context()) + newReq.URL.Path = proxyPath + requestInfo := lifted.NewRequestInfo(newReq) + + newCtx := request.WithRequestInfo(ctx, requestInfo) + newCtx = request.WithNamespace(newCtx, requestInfo.Namespace) + newReq = newReq.WithContext(newCtx) + + gvr := schema.GroupVersionResource{ + Group: requestInfo.APIGroup, + Version: requestInfo.APIVersion, + Resource: requestInfo.Resource, + } + + h, err := rc.delegate.Connect(newCtx, delegate.ProxyRequest{ + RequestInfo: requestInfo, + GroupVersionResource: gvr, + ProxyPath: proxyPath, + Responder: responder, + HTTPReq: newReq, + }) + + if err != nil { + h = &errorHTTPHandler{ + requestInfo: requestInfo, + err: err, + negotiatedSerializer: rc.negotiatedSerializer, + } + } + + h = metrics.InstrumentHandlerFunc(requestInfo.Verb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, + "", utils.KosmosClusrerLinkRroxyComponentName, false, "", h.ServeHTTP) + h.ServeHTTP(rw, newReq) + }), nil +} + +type errorHTTPHandler struct { + requestInfo *request.RequestInfo + err error + negotiatedSerializer runtime.NegotiatedSerializer +} + +func (handler *errorHTTPHandler) ServeHTTP(delegate http.ResponseWriter, req *http.Request) { + // Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error. + gv := schema.GroupVersion{ + Group: handler.requestInfo.APIGroup, + Version: handler.requestInfo.Verb, + } + responsewriters.ErrorNegotiated(handler.err, handler.negotiatedSerializer, gv, delegate, req) +} diff --git a/pkg/clusterlink/proxy/controller/controller_test.go b/pkg/clusterlink/proxy/controller/controller_test.go new file mode 100644 index 000000000..37edc99ac --- /dev/null +++ b/pkg/clusterlink/proxy/controller/controller_test.go @@ -0,0 +1,91 @@ +package controller + +import ( + "strings" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + dynfake "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + + fakekosmosclient "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned/fake" + informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" +) + +var apiGroupResources = []*restmapper.APIGroupResources{ + { + Group: metav1.APIGroup{ + Name: "apps", + Versions: []metav1.GroupVersionForDiscovery{ + {GroupVersion: "apps/v1", Version: "v1"}, + }, + PreferredVersion: metav1.GroupVersionForDiscovery{ + GroupVersion: "apps/v1", Version: "v1", + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "deployments", SingularName: "deployment", Namespaced: true, Kind: "Deployment"}, + }, + }, + }, + { + Group: metav1.APIGroup{ + Name: "", + Versions: []metav1.GroupVersionForDiscovery{ + {GroupVersion: "v1", Version: "v1"}, + }, + PreferredVersion: metav1.GroupVersionForDiscovery{ + GroupVersion: "v1", Version: "v1", + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "pods", SingularName: "pod", Namespaced: true, Kind: "Pod"}, + }, + }, + }, +} + +func TestNewResourceCacheController(t *testing.T) { + type args struct { + option NewControllerOption + } + dyClient, _ := dynfake.NewForConfig(&rest.Config{}) + o := NewControllerOption{ + DynamicClient: dyClient, + KosmosFactory: informerfactory.NewSharedInformerFactory(fakekosmosclient.NewSimpleClientset(), 0), + RestConfig: &rest.Config{}, + RestMapper: restmapper.NewDiscoveryRESTMapper(apiGroupResources), + } + tests := []struct { + name string + args args + want *ResourceCacheController + wantErr bool + errMsg string + }{ + { + name: "NewResourceCacheController", + args: args{ + option: o, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewResourceCacheController(tt.args.option) + if err == nil && tt.wantErr { + t.Fatal("expected an error, but got none") + } + if err != nil && !tt.wantErr { + t.Errorf("unexpected error, got: %v", err) + } + if err != nil && tt.wantErr && !strings.Contains(err.Error(), tt.errMsg) { + t.Errorf("expected error message %s to be in %s", tt.errMsg, err.Error()) + } + }) + } +} diff --git a/pkg/clusterlink/proxy/delegate/apiserver/apiserver.go b/pkg/clusterlink/proxy/delegate/apiserver/apiserver.go new file mode 100644 index 000000000..931c470cd --- /dev/null +++ b/pkg/clusterlink/proxy/delegate/apiserver/apiserver.go @@ -0,0 +1,68 @@ +package delegate + +import ( + "context" + "net/http" + "net/url" + "path" + + proxyutil "k8s.io/apimachinery/pkg/util/proxy" + restclient "k8s.io/client-go/rest" + + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate" +) + +const ( + order = 3000 +) + +type Apiserver struct { + proxyLocation *url.URL + proxyTransport http.RoundTripper +} + +var _ delegate.Delegate = &Apiserver{} + +func New(dep delegate.Dependency) (delegate.Delegate, error) { + location, err := url.Parse(dep.RestConfig.Host) + if err != nil { + return nil, err + } + + transport, err := restclient.TransportFor(dep.RestConfig) + if err != nil { + return nil, err + } + + return &Apiserver{ + proxyLocation: location, + proxyTransport: transport, + }, nil +} + +// Connect implements Delegate. +func (a *Apiserver) Connect(_ context.Context, request delegate.ProxyRequest) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + location, transport := a.resourceLocation() + location.Path = path.Join(location.Path, request.ProxyPath) + location.RawQuery = req.URL.RawQuery + + handler := proxyutil.NewUpgradeAwareHandler(location, transport, true, false, proxyutil.NewErrorResponder(request.Responder)) + handler.ServeHTTP(rw, req) + }), nil +} + +// Order implements Delegate. +func (a *Apiserver) Order() int { + return order +} + +// SupportRequest implements Delegate. +func (a *Apiserver) SupportRequest(_ delegate.ProxyRequest) bool { + return true +} + +func (a *Apiserver) resourceLocation() (*url.URL, http.RoundTripper) { + location := *a.proxyLocation + return &location, a.proxyTransport +} diff --git a/pkg/clusterlink/proxy/delegate/cache/cache.go b/pkg/clusterlink/proxy/delegate/cache/cache.go new file mode 100644 index 000000000..3a56a1f43 --- /dev/null +++ b/pkg/clusterlink/proxy/delegate/cache/cache.go @@ -0,0 +1,135 @@ +package cache + +import ( + "context" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/registry/rest" + k8srest "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/kubernetes/scheme" + + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate" + "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/store" +) + +const ( + order = 1000 +) + +type Cache struct { + store store.Store + restMapper meta.RESTMapper + minRequestTimeout time.Duration +} + +var _ delegate.Delegate = (*Cache)(nil) + +func New(dep delegate.Dependency) delegate.Delegate { + return &Cache{ + store: dep.Store, + restMapper: dep.RestMapper, + minRequestTimeout: dep.MinRequestTimeout, + } +} + +// Connect implements Proxy. +func (c *Cache) Connect(_ context.Context, request delegate.ProxyRequest) (http.Handler, error) { + requestInfo := request.RequestInfo + r := &rester{ + store: c.store, + gvr: request.GroupVersionResource, + tableConvertor: k8srest.NewDefaultTableConvertor(request.GroupVersionResource.GroupResource()), + } + + gvk, err := c.restMapper.KindFor(request.GroupVersionResource) + if err != nil { + return nil, err + } + mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, err + } + + scope := &handlers.RequestScope{ + Kind: gvk, + Resource: request.GroupVersionResource, + Namer: &handlers.ContextBasedNaming{ + Namer: meta.NewAccessor(), + ClusterScoped: mapping.Scope.Name() == meta.RESTScopeNameRoot, + }, + Serializer: scheme.Codecs.WithoutConversion(), + Convertor: runtime.NewScheme(), + Subresource: requestInfo.Subresource, + MetaGroupVersion: metav1.SchemeGroupVersion, + TableConvertor: r.tableConvertor, + } + + var h http.Handler + if requestInfo.Verb == "watch" || requestInfo.Name == "" { + // for list or watch + h = handlers.ListResource(r, r, scope, false, c.minRequestTimeout) + } else { + h = handlers.GetResource(r, scope) + } + return h, nil +} + +func (c *Cache) Order() int { + return order +} + +// SupportRequest implements Plugin +func (c *Cache) SupportRequest(request delegate.ProxyRequest) bool { + requestInfo := request.RequestInfo + + return requestInfo.IsResourceRequest && + c.store.HasResource(request.GroupVersionResource) && + requestInfo.Subresource == "" && + (requestInfo.Verb == "get" || + requestInfo.Verb == "list" || + requestInfo.Verb == "watch") +} + +type rester struct { + store store.Store + gvr schema.GroupVersionResource + tableConvertor rest.TableConvertor +} + +var _ rest.Getter = &rester{} +var _ rest.Lister = &rester{} +var _ rest.Watcher = &rester{} + +// Get implements rest.Getter interface +func (r *rester) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + return r.store.Get(ctx, r.gvr, name, options) +} + +// Watch implements rest.Watcher interface +func (r *rester) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + return r.store.Watch(ctx, r.gvr, options) +} + +// List implements rest.Lister interface +func (r *rester) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { + return r.store.List(ctx, r.gvr, options) +} + +// NewList implements rest.Lister interface +func (r *rester) NewList() runtime.Object { + return &unstructured.UnstructuredList{} +} + +// ConvertToTable implements rest.Lister interface +func (r *rester) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { + return r.tableConvertor.ConvertToTable(ctx, object, tableOptions) +} diff --git a/pkg/clusterlink/proxy/delegate/interface.go b/pkg/clusterlink/proxy/delegate/interface.go new file mode 100644 index 000000000..90049908f --- /dev/null +++ b/pkg/clusterlink/proxy/delegate/interface.go @@ -0,0 +1,78 @@ +package delegate + +import ( + "context" + "fmt" + "net/http" + "sort" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + clientrest "k8s.io/client-go/rest" + + store "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/store" +) + +type Proxy interface { + Connect(ctx context.Context, request ProxyRequest) (http.Handler, error) +} + +type Delegate interface { + Proxy + Order() int + SupportRequest(request ProxyRequest) bool +} + +var _ Proxy = (*delegateChain)(nil) + +type Dependency struct { + RestConfig *clientrest.Config + RestMapper meta.RESTMapper + + MinRequestTimeout time.Duration + Store store.Store +} + +type ProxyRequest struct { + RequestInfo *request.RequestInfo + GroupVersionResource schema.GroupVersionResource + ProxyPath string + + Responder rest.Responder + HTTPReq *http.Request +} + +type delegateChain struct { + delegateList []Delegate +} + +func NewDelegateChain(delegateList []Delegate) Proxy { + sort.Slice(delegateList, func(i, j int) bool { + return delegateList[i].Order() < delegateList[j].Order() + }) + return &delegateChain{delegateList: delegateList} +} + +// Connect implements Proxy. +func (d *delegateChain) Connect(ctx context.Context, request ProxyRequest) (http.Handler, error) { + proxy, err := d.selectDelegate(request) + if err != nil { + return nil, err + } + + return proxy.Connect(ctx, request) +} + +func (d *delegateChain) selectDelegate(request ProxyRequest) (Delegate, error) { + for _, delegate := range d.delegateList { + if delegate.SupportRequest(request) { + return delegate, nil + } + } + + return nil, fmt.Errorf("no plugin found for request: %v %v", + request.RequestInfo.Verb, request.RequestInfo.Path) +} diff --git a/pkg/clusterlink/proxy/storage.go b/pkg/clusterlink/proxy/storage.go deleted file mode 100644 index deec6e6d2..000000000 --- a/pkg/clusterlink/proxy/storage.go +++ /dev/null @@ -1,239 +0,0 @@ -package proxy - -import ( - "context" - "fmt" - "net/http" - "net/url" - "path" - "time" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/proxy" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/endpoints/handlers" - "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" - "k8s.io/apiserver/pkg/endpoints/request" - genericrequest "k8s.io/apiserver/pkg/endpoints/request" - k8srest "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - clientrest "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/store" - "github.com/kosmos.io/kosmos/pkg/utils/lifted" -) - -var supportMethods = []string{"GET", "DELETE", "POST", "PUT", "PATCH", "HEAD", "OPTIONS"} - -type REST struct { - store store.Store - config *rest.Config - restMapper meta.RESTMapper -} - -func NewREST(config *rest.Config, client dynamic.Interface) *REST { - restMapper, err := apiutil.NewDynamicRESTMapper(config) - if err != nil { - panic(err) - } - store := store.NewClusterCache(client, restMapper) - return &REST{ - store: store, - config: config, - restMapper: restMapper, - } -} - -// New return empty Proxy object. -func (rest *REST) New() runtime.Object { - return &v1alpha1.Proxy{} -} - -// NamespaceScoped returns false because Storage is not namespaced. -func (rest *REST) NamespaceScoped() bool { - return false -} - -// ConnectMethods returns the list of HTTP methods handled by Connect. -func (rest *REST) ConnectMethods() []string { - return supportMethods -} - -// NewConnectOptions returns an empty options object that will be used to pass options to the Connect method. -func (rest *REST) NewConnectOptions() (runtime.Object, bool, string) { - return nil, true, "" -} - -// Connect returns a handler for proxy. -func (rest *REST) Connect(ctx context.Context, _ string, _ runtime.Object, responder k8srest.Responder) (http.Handler, error) { - info, ok := genericrequest.RequestInfoFrom(ctx) - if !ok { - return nil, fmt.Errorf("no RequestInfo found in the context") - } - - proxyHandler := rest.createProxyHandler(responder) - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - paths := []string{info.APIPrefix, info.APIGroup, info.APIVersion, info.Resource} - serverPrefix := "/" + path.Join(paths...) - http.StripPrefix(serverPrefix, rest.createHandler(ctx, 300*time.Second, proxyHandler)).ServeHTTP(w, req) - }), nil -} - -func (rest *REST) createHandler(ctx context.Context, minRequestTimeout time.Duration, proxyHandler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - newReq := req.Clone(req.Context()) - requestInfo := lifted.NewRequestInfo(req) - - newCtx := request.WithRequestInfo(ctx, requestInfo) - newCtx = request.WithNamespace(newCtx, requestInfo.Namespace) - newReq = newReq.WithContext(newCtx) - - gvr := schema.GroupVersionResource{ - Group: requestInfo.APIGroup, - Version: requestInfo.APIVersion, - Resource: requestInfo.Resource, - } - - if gvr.Group == "" && gvr.Resource == "" { - proxyHandler.ServeHTTP(w, req) - return - } - - r := &rester{ - store: rest.store, - gvr: gvr, - tableConvertor: k8srest.NewDefaultTableConvertor(gvr.GroupResource()), - } - gvk, err := rest.restMapper.KindFor(gvr) - if err != nil { - responsewriters.ErrorNegotiated( - apierrors.NewInternalError(err), - Codecs, schema.GroupVersion{}, w, req, - ) - return - } - - mapping, err := rest.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - responsewriters.ErrorNegotiated( - apierrors.NewInternalError(err), - Codecs, schema.GroupVersion{}, w, req, - ) - return - } - clusterScoped := mapping.Scope.Name() == meta.RESTScopeNameRoot - scope := &handlers.RequestScope{ - Kind: gvk, - Resource: gvr, - Namer: &handlers.ContextBasedNaming{ - Namer: meta.NewAccessor(), - ClusterScoped: clusterScoped, - }, - Serializer: scheme.Codecs.WithoutConversion(), - Convertor: runtime.NewScheme(), - Subresource: requestInfo.Subresource, - MetaGroupVersion: metav1.SchemeGroupVersion, - } - var h http.Handler - switch requestInfo.Verb { - case "get": - h = handlers.GetResource(r, scope) - case "list", "watch": - h = handlers.ListResource(r, r, scope, false, minRequestTimeout) - case "patch", "update", "create", "delete": - h = proxyHandler - default: - responsewriters.ErrorNegotiated( - apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb), - Codecs, gvr.GroupVersion(), w, req, - ) - } - if h != nil { - h.ServeHTTP(w, newReq) - } - }) -} - -func (rest *REST) createProxyHandler(responder k8srest.Responder) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - kubernetes, err := url.Parse(rest.config.Host) - if err != nil { - handlerError(w, req, err) - return - } - s := *req.URL - s.Host = kubernetes.Host - s.Scheme = kubernetes.Scheme - req.Header.Del("Authorization") - defaultTransport, err := clientrest.TransportFor(rest.config) - if err != nil { - handlerError(w, req, err) - return - } - httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, proxy.NewErrorResponder(responder)) - httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport) - httpProxy.ServeHTTP(w, req) - }) -} - -func handlerError(w http.ResponseWriter, req *http.Request, err error) int { - return responsewriters.ErrorNegotiated( - apierrors.NewInternalError(err), - Codecs, schema.GroupVersion{}, w, req, - ) -} - -// Destroy cleans up its resources on shutdown. -func (rest *REST) Destroy() { - // Given no underlying store, so we don't - // need to destroy anything. -} - -type rester struct { - store store.Store - gvr schema.GroupVersionResource - tableConvertor k8srest.TableConvertor -} - -var _ k8srest.Getter = &rester{} -var _ k8srest.Lister = &rester{} -var _ k8srest.Watcher = &rester{} - -// Get implements rest.Getter interface -func (r *rester) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { - return r.store.Get(ctx, r.gvr, name, options) -} - -// Watch implements rest.Watcher interface -func (r *rester) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { - return r.store.Watch(ctx, r.gvr, options) -} - -// List implements rest.Lister interface -func (r *rester) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - return r.store.List(ctx, r.gvr, options) -} - -// NewList implements rest.Lister interface -func (r *rester) NewList() runtime.Object { - return &unstructured.UnstructuredList{} -} - -// ConvertToTable implements rest.Lister interface -func (r *rester) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { - return r.tableConvertor.ConvertToTable(ctx, object, tableOptions) -} - -func (r *rester) New() runtime.Object { - return &unstructured.Unstructured{} -} diff --git a/pkg/clusterlink/proxy/store/cluster_cache.go b/pkg/clusterlink/proxy/store/cluster_cache.go index fb3ee9993..1ede02f1d 100644 --- a/pkg/clusterlink/proxy/store/cluster_cache.go +++ b/pkg/clusterlink/proxy/store/cluster_cache.go @@ -15,11 +15,14 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/utils" ) // Store is the cache for resources from controlpanel type Store interface { - UpdateCache(resources map[schema.GroupVersionResource]struct{}) error + UpdateCache(resources map[schema.GroupVersionResource]*utils.MultiNamespace) error + HasResource(resource schema.GroupVersionResource) bool GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) Stop() @@ -28,53 +31,44 @@ type Store interface { Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) } -var _ Store = &Cache{} +var _ Store = &ClusterCache{} -// Cache caches resources -type Cache struct { +// ClusterCache caches resources +type ClusterCache struct { lock sync.RWMutex cache map[schema.GroupVersionResource]*resourceCache restMapper meta.RESTMapper client dynamic.Interface } -func NewClusterCache(client dynamic.Interface, restMapper meta.RESTMapper) *Cache { - // TODO add controller dynamic add clusterlink crd - cache := &Cache{ +// ReadinessCheck implements Store. + +func NewClusterCache(client dynamic.Interface, restMapper meta.RESTMapper) *ClusterCache { + cache := &ClusterCache{ client: client, restMapper: restMapper, cache: map[schema.GroupVersionResource]*resourceCache{}, } - resources := map[schema.GroupVersionResource]struct{}{ - {Group: "kosmos.io", Version: "v1alpha1", Resource: "clusters"}: {}, - {Group: "kosmos.io", Version: "v1alpha1", Resource: "clusternodes"}: {}, - {Group: "kosmos.io", Version: "v1alpha1", Resource: "nodeconfigs"}: {}, - } - err := cache.UpdateCache(resources) - if err != nil { - panic(err) - } return cache } -func (c *Cache) UpdateCache(resources map[schema.GroupVersionResource]struct{}) error { +func (c *ClusterCache) UpdateCache(resources map[schema.GroupVersionResource]*utils.MultiNamespace) error { c.lock.Lock() defer c.lock.Unlock() - // remove non-exist resources - for resource := range c.cache { - if _, exist := resources[resource]; !exist { - klog.Infof("Remove cache for %s", resource.String()) - c.cache[resource].stop() - delete(c.cache, resource) + // remove non-exist resources and namespaces changed resource + for gvr, rc := range c.cache { + if namespaces, exist := resources[gvr]; !exist || !rc.namespaces.Equal(namespaces) { + klog.Infof("Remove cache for %s", gvr.String()) + c.cache[gvr].stop() + delete(c.cache, gvr) } } // add resource cache - for resource := range resources { - _, exist := c.cache[resource] - if !exist { - kind, err := c.restMapper.KindFor(resource) + for gvr, namespaces := range resources { + if _, exist := c.cache[gvr]; !exist { + kind, err := c.restMapper.KindFor(gvr) if err != nil { return err } @@ -84,18 +78,18 @@ func (c *Cache) UpdateCache(resources map[schema.GroupVersionResource]struct{}) } namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace - klog.Infof("Add cache for %s", resource.String()) - cache, err := newResourceCache(resource, kind, namespaced, c.clientForResourceFunc(resource)) + klog.Infof("Add cache for %s", gvr.String()) + cache, err := newResourceCache(gvr, kind, namespaced, namespaces, c.clientForResourceFunc(gvr)) if err != nil { return err } - c.cache[resource] = cache + c.cache[gvr] = cache } } return nil } -func (c *Cache) Stop() { +func (c *ClusterCache) Stop() { c.lock.RLock() defer c.lock.RUnlock() @@ -104,11 +98,11 @@ func (c *Cache) Stop() { } } -func (c *Cache) GetResourceFromCache(_ context.Context, _ schema.GroupVersionResource, _, _ string) (runtime.Object, string, error) { +func (c *ClusterCache) GetResourceFromCache(_ context.Context, _ schema.GroupVersionResource, _, _ string) (runtime.Object, string, error) { return nil, "", nil } -func (c *Cache) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { +func (c *ClusterCache) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { rc := c.cacheForResource(gvr) if rc == nil { return nil, fmt.Errorf("can not find gvr %v", gvr) @@ -122,7 +116,7 @@ func (c *Cache) Get(ctx context.Context, gvr schema.GroupVersionResource, name s return cloneObj, err } -func (c *Cache) Update(ctx context.Context, gvr schema.GroupVersionResource, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { +func (c *ClusterCache) Update(ctx context.Context, gvr schema.GroupVersionResource, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { rc := c.cacheForResource(gvr) if rc == nil { // TODO @@ -131,12 +125,14 @@ func (c *Cache) Update(ctx context.Context, gvr schema.GroupVersionResource, nam return rc.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) } -func (c *Cache) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) { +func (c *ClusterCache) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) { rc := c.cacheForResource(gvr) if rc == nil { - // TODO return nil, fmt.Errorf("can not find target gvr %v", gvr) } + if options.ResourceVersion == "" { + options.ResourceVersion = "0" + } list, err := rc.List(ctx, options) if err != nil { return nil, err @@ -144,7 +140,7 @@ func (c *Cache) List(ctx context.Context, gvr schema.GroupVersionResource, optio return list, nil } -func (c *Cache) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) { +func (c *ClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) { rc := c.cacheForResource(gvr) if rc == nil { return nil, fmt.Errorf("can not find target gvr %v", gvr) @@ -156,13 +152,25 @@ func (c *Cache) Watch(ctx context.Context, gvr schema.GroupVersionResource, opti return w, nil } -func (c *Cache) clientForResourceFunc(resource schema.GroupVersionResource) func() (dynamic.NamespaceableResourceInterface, error) { +func (c *ClusterCache) HasResource(resource schema.GroupVersionResource) bool { + c.lock.RLock() + defer c.lock.RUnlock() + _, ok := c.cache[resource] + return ok +} + +// get the client for the resource +func (c *ClusterCache) clientForResourceFunc(resource schema.GroupVersionResource) func() (dynamic.NamespaceableResourceInterface, error) { return func() (dynamic.NamespaceableResourceInterface, error) { + if c.client == nil { + return nil, errors.New("client is nil") + } return c.client.Resource(resource), nil } } -func (c *Cache) cacheForResource(gvr schema.GroupVersionResource) *resourceCache { +// get the cache for the resource +func (c *ClusterCache) cacheForResource(gvr schema.GroupVersionResource) *resourceCache { c.lock.RLock() defer c.lock.RUnlock() return c.cache[gvr] diff --git a/pkg/clusterlink/proxy/store/resource_cache.go b/pkg/clusterlink/proxy/store/resource_cache.go index b882dc61e..1093713dc 100644 --- a/pkg/clusterlink/proxy/store/resource_cache.go +++ b/pkg/clusterlink/proxy/store/resource_cache.go @@ -22,12 +22,15 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/utils" ) -// resourceCache cache one kind resource from single member cluster +// resourceCache cache one kind resource from cluster type resourceCache struct { *genericregistry.Store - resource schema.GroupVersionResource + resource schema.GroupVersionResource + namespaces *utils.MultiNamespace } func (c *resourceCache) stop() { @@ -36,7 +39,7 @@ func (c *resourceCache) stop() { } func newResourceCache(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, - namespaced bool, newClientFunc func() (dynamic.NamespaceableResourceInterface, error)) (*resourceCache, error) { + namespaced bool, multiNS *utils.MultiNamespace, newClientFunc func() (dynamic.NamespaceableResourceInterface, error)) (*resourceCache, error) { s := &genericregistry.Store{ DefaultQualifiedResource: gvr.GroupResource(), NewFunc: func() runtime.Object { @@ -48,15 +51,11 @@ func newResourceCache(gvr schema.GroupVersionResource, gvk schema.GroupVersionKi NewListFunc: func() runtime.Object { o := &unstructured.UnstructuredList{} o.SetAPIVersion(gvk.GroupVersion().String()) - // TODO: it's unsafe guesses kind name for resource list o.SetKind(gvk.Kind + "List") return o }, TableConvertor: rest.NewDefaultTableConvertor(gvr.GroupResource()), - // CreateStrategy tells whether the resource is namespaced. - // see: vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L1310-L1318 CreateStrategy: restCreateStrategy(namespaced), - // Assign `DeleteStrategy` to pass vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L1320-L1322 DeleteStrategy: restDeleteStrategy, } @@ -70,7 +69,7 @@ func newResourceCache(gvr schema.GroupVersionResource, gvk schema.GroupVersionKi GroupResource: gvr.GroupResource(), }, ResourcePrefix: gvr.Group + "/" + gvr.Resource, - Decorator: storageWithCacher(newClientFunc, defaultVersioner), + Decorator: storageWithCacher(gvr, multiNS, newClientFunc, defaultVersioner), }, AttrFunc: getAttrsFunc(namespaced), }) @@ -79,12 +78,13 @@ func newResourceCache(gvr schema.GroupVersionResource, gvk schema.GroupVersionKi } return &resourceCache{ - Store: s, - resource: gvr, + Store: s, + resource: gvr, + namespaces: multiNS, }, nil } -func storageWithCacher(newClientFunc func() (dynamic.NamespaceableResourceInterface, error), versioner storage.Versioner) generic.StorageDecorator { +func storageWithCacher(gvr schema.GroupVersionResource, multiNS *utils.MultiNamespace, newClientFunc func() (dynamic.NamespaceableResourceInterface, error), versioner storage.Versioner) generic.StorageDecorator { return func( storageConfig *storagebackend.ConfigForResource, resourcePrefix string, @@ -95,11 +95,12 @@ func storageWithCacher(newClientFunc func() (dynamic.NamespaceableResourceInterf triggerFuncs storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { cacherConfig := cacherstorage.Config{ - Storage: newStore(newClientFunc, versioner, resourcePrefix), + Storage: newStore(gvr, newClientFunc, multiNS, versioner, resourcePrefix), Versioner: versioner, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, GetAttrsFunc: getAttrsFunc, + IndexerFuncs: triggerFuncs, Indexers: indexers, NewFunc: newFunc, NewListFunc: newListFunc, diff --git a/pkg/clusterlink/proxy/store/store.go b/pkg/clusterlink/proxy/store/store.go index f36d96c5b..73a3f064e 100644 --- a/pkg/clusterlink/proxy/store/store.go +++ b/pkg/clusterlink/proxy/store/store.go @@ -6,32 +6,42 @@ import ( "strings" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/dynamic" + + "github.com/kosmos.io/kosmos/pkg/utils" ) -// store implements storage.Interface, Providing Get/Watch/List resources from member clusters. +// store implements storage.Interface like underlying storage, +// Providing Get/Watch/List resources from clusters. type store struct { + gvr schema.GroupVersionResource + namespace *utils.MultiNamespace + // newClientFunc returns a resource client + newClientFunc func() (dynamic.NamespaceableResourceInterface, error) + versioner storage.Versioner prefix string - // newClientFunc returns a resource client for member cluster apiserver - newClientFunc func() (dynamic.NamespaceableResourceInterface, error) } var _ storage.Interface = &store{} -func newStore(newClientFunc func() (dynamic.NamespaceableResourceInterface, error), versioner storage.Versioner, prefix string) *store { +func newStore(gvr schema.GroupVersionResource, newClientFunc func() (dynamic.NamespaceableResourceInterface, error), multiNS *utils.MultiNamespace, versioner storage.Versioner, prefix string) *store { return &store{ newClientFunc: newClientFunc, versioner: versioner, prefix: prefix, + namespace: multiNS, + gvr: gvr, } } -// Versioner implements storage.Interface. func (s *store) Versioner() storage.Versioner { return s.versioner } @@ -39,14 +49,16 @@ func (s *store) Versioner() storage.Versioner { // Get implements storage.Interface. func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { var namespace, name string + // get name and namespace from path part1, part2 := s.splitKey(key) if part2 == "" { - // for cluster scope resource, key is /prefix/name. So parts are [name, ""] name = part1 } else { - // for namespace scope resource, key is /prefix/namespace/name. So parts are [namespace, name] namespace, name = part1, part2 } + if namespace != metav1.NamespaceAll && !s.namespace.Contains(namespace) { + return apierrors.NewNotFound(s.gvr.GroupResource(), name) + } client, err := s.client(namespace) if err != nil { @@ -57,7 +69,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ob if err != nil { return err } - + setEmptyManagedFields(obj) unstructuredObj := objPtr.(*unstructured.Unstructured) obj.DeepCopyInto(unstructuredObj) return nil @@ -74,19 +86,39 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, // For namespace scope resources, key is /prefix/namespace. Parts are [namespace, ""] namespace, _ := s.splitKey(key) - client, err := s.client(namespace) + reqNS, objFilter, shortCircuit := filterNS(s.namespace, namespace) + if shortCircuit { + return nil + } + + client, err := s.client(reqNS) if err != nil { return err } - options := convertToMetaV1ListOptions(opts) - objects, err := client.List(ctx, options) + objects, err := client.List(ctx, convertToMetaV1ListOptions(opts)) if apierrors.IsNotFound(err) { return nil } if err != nil { return err } + + filteredItems := make([]unstructured.Unstructured, 0, len(objects.Items)) + for _, obj := range objects.Items { + setEmptyManagedFields(obj) + + if objFilter != nil { + obj := obj + if objFilter(&obj) { + filteredItems = append(filteredItems, obj) + } + } + } + if len(filteredItems) > 0 { + objects.Items = filteredItems + } + objects.DeepCopyInto(listObj.(*unstructured.UnstructuredList)) return nil } @@ -101,14 +133,28 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) // For cluster scope resources, key is /prefix. Parts are ["", ""] // For namespace scope resources, key is /prefix/namespace. Parts are [namespace, ""] namespace, _ := s.splitKey(key) + reqNS, objFilter, shortCircuit := filterNS(s.namespace, namespace) + if shortCircuit { + return watch.NewEmptyWatch(), nil + } - client, err := s.client(namespace) + client, err := s.client(reqNS) if err != nil { return nil, err } - options := convertToMetaV1ListOptions(opts) - return client.Watch(ctx, options) + watcher, err := client.Watch(ctx, convertToMetaV1ListOptions(opts)) + if err != nil { + return nil, err + } + watcher = watch.Filter(watcher, func(in watch.Event) (watch.Event, bool) { + setEmptyManagedFields(in.Object) + if objFilter != nil { + return in, objFilter(in.Object) + } + return in, true + }) + return watcher, nil } // Create implements storage.Interface. @@ -159,3 +205,50 @@ func (s *store) splitKey(key string) (string, string) { } return part0, part1 } + +// filterNS returns the namespace to request and the filter function to filter objects. +// filter namespace, only watch cached namsspace resources +func filterNS(cached *utils.MultiNamespace, request string) (reqNS string, objFilter func(runtime.Object) bool, shortCircuit bool) { + if cached.IsAll { + reqNS = request + return + } + + if ns, ok := cached.Single(); ok { + if request == metav1.NamespaceAll || request == ns { + reqNS = ns + } else { + shortCircuit = true + } + return + } + + if request == metav1.NamespaceAll { + reqNS = metav1.NamespaceAll + objFilter = objectFilter(cached) + } else if cached.Contains(request) { + reqNS = request + } else { + shortCircuit = true + } + return +} + +func objectFilter(ns *utils.MultiNamespace) func(o runtime.Object) bool { + return func(o runtime.Object) bool { + accessor, err := meta.Accessor(o) + if err != nil { + return true + } + return ns.Contains(accessor.GetNamespace()) + } +} + +// setEmptyManagedFields sets the managed fields to empty +func setEmptyManagedFields(obj interface{}) { + if accssor, err := meta.Accessor(obj); err == nil { + if accssor.GetManagedFields() != nil { + accssor.SetManagedFields(nil) + } + } +} diff --git a/pkg/registry/clusterlink/storage/proxy_test.go b/pkg/registry/clusterlink/storage/proxy_test.go index 7a88cc110..87c28d2a7 100644 --- a/pkg/registry/clusterlink/storage/proxy_test.go +++ b/pkg/registry/clusterlink/storage/proxy_test.go @@ -4,10 +4,11 @@ import ( "context" "testing" - proxyv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/proxy/v1alpha1" - clusterlinkproxy "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller" "github.com/stretchr/testify/assert" genericrequest "k8s.io/apiserver/pkg/endpoints/request" + + proxyv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/proxy/v1alpha1" + clusterlinkproxy "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller" ) func TestNewProxyREST(t *testing.T) { @@ -48,7 +49,7 @@ func TestNewProxyREST_NewConnectOptions(t *testing.T) { assert.Equal(t, "", s) } -func TestProxyREST_Destroy(t *testing.T) { +func TestProxyREST_Destroy(_ *testing.T) { ctl := clusterlinkproxy.ResourceCacheController{} r := NewProxyREST(&ctl) r.Destroy() diff --git a/pkg/sharedcli/profileflag/profileflag.go b/pkg/sharedcli/profileflag/profileflag.go new file mode 100644 index 000000000..433448805 --- /dev/null +++ b/pkg/sharedcli/profileflag/profileflag.go @@ -0,0 +1,101 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package profileflag + +import ( + "net/http" + "net/http/pprof" + "os" + "time" + + "github.com/spf13/pflag" + "k8s.io/klog/v2" +) + +const ( + // ReadHeaderTimeout is the amount of time allowed to read + // request headers. + // HTTP timeouts are necessary to expire inactive connections + // and failing to do so might make the application vulnerable + // to attacks like slowloris which work by sending data very slow, + // which in case of no timeout will keep the connection active + // eventually leading to a denial-of-service (DoS) attack. + // References: + // - https://en.wikipedia.org/wiki/Slowloris_(computer_security) + ReadHeaderTimeout = 32 * time.Second + // WriteTimeout is the amount of time allowed to write the + // request data. + // HTTP timeouts are necessary to expire inactive connections + // and failing to do so might make the application vulnerable + // to attacks like slowloris which work by sending data very slow, + // which in case of no timeout will keep the connection active + // eventually leading to a denial-of-service (DoS) attack. + WriteTimeout = 5 * time.Minute + // ReadTimeout is the amount of time allowed to read + // response data. + // HTTP timeouts are necessary to expire inactive connections + // and failing to do so might make the application vulnerable + // to attacks like slowloris which work by sending data very slow, + // which in case of no timeout will keep the connection active + // eventually leading to a denial-of-service (DoS) attack. + ReadTimeout = 5 * time.Minute +) + +// Options are options for pprof. +type Options struct { + // EnableProfile is the flag about whether to enable pprof profiling. + EnableProfile bool + // ProfilingBindAddress is the TCP address for pprof profiling. + // Defaults to :6060 if unspecified. + ProfilingBindAddress string +} + +// AddFlags adds flags to the specified FlagSet. +func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.EnableProfile, "enable-pprof", false, "Enable profiling via web interface host:port/debug/pprof/.") + fs.StringVar(&o.ProfilingBindAddress, "profiling-bind-address", ":6060", "The TCP address for serving profiling(e.g. 127.0.0.1:6060, :6060). This is only applicable if profiling is enabled.") +} + +func installHandlerForPProf(mux *http.ServeMux) { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) +} + +// ListenAndServe start a http server to enable pprof. +func ListenAndServe(opts Options) { + if opts.EnableProfile { + mux := http.NewServeMux() + installHandlerForPProf(mux) + klog.Infof("Starting profiling on port %s", opts.ProfilingBindAddress) + go func() { + httpServer := http.Server{ + Addr: opts.ProfilingBindAddress, + Handler: mux, + ReadHeaderTimeout: ReadHeaderTimeout, + WriteTimeout: WriteTimeout, + ReadTimeout: ReadTimeout, + } + if err := httpServer.ListenAndServe(); err != nil { + klog.Errorf("Failed to enable profiling: %v", err) + os.Exit(1) + } + }() + } +} diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index c4f15c42c..855077963 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -197,3 +197,11 @@ var GVR_SERVICE = schema.GroupVersionResource{ Version: "v1", Resource: "services", } + +const ( + KosmosClusrerLinkRroxyComponentName = "clusterlink-proxy" +) + +const ( + ClusterLinkOpenAPIVersion = "1.0" +) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index e6636f67d..7a48f0970 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -7,6 +7,7 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" netutils "k8s.io/utils/net" ) @@ -81,3 +82,50 @@ func HasKosmosNodeLabel(node *corev1.Node) bool { return false } + +type MultiNamespace struct { + IsAll bool + namespaces sets.Set[string] +} + +func NewMultiNamespace() *MultiNamespace { + return &MultiNamespace{ + namespaces: sets.New[string](), + } +} + +func (n *MultiNamespace) Add(ns ...string) { + if len(ns) == 0 { + n.IsAll = true + n.namespaces = nil + return + } + if n.IsAll || n.namespaces.HasAll(ns...) { + return + } + n.namespaces.Insert(ns...) +} + +func (n *MultiNamespace) Contains(ns ...string) bool { + return n.IsAll || n.namespaces.HasAll(ns...) +} + +func (n *MultiNamespace) Single() (string, bool) { + if n.IsAll || n.namespaces.Len() != 1 { + return "", false + } + + // reach here means there is exactly one namespace, so we can safely get it. + ns := sets.List(n.namespaces)[0] + return ns, true +} + +func (n *MultiNamespace) Equal(another *MultiNamespace) bool { + if n.IsAll != another.IsAll { + return false + } + if n.IsAll && another.IsAll { + return true + } + return n.namespaces.Equal(another.namespaces) +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index d08909d77..805599659 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" ) func TestContainsStringInUtils(t *testing.T) { @@ -116,3 +117,43 @@ func TestFormatCIDRInUtils(t *testing.T) { } } } + +func TestMultiNamespace(t *testing.T) { + type fields struct { + IsAll bool + namespaces sets.Set[string] + } + type args struct { + ns []string + } + tests := []struct { + name string + fields fields + args args + }{ + { + name: "case1", + fields: fields{ + IsAll: false, + namespaces: sets.Set[string]{}, + }, + args: args{ + ns: []string{"ns1", "ns2"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n1 := NewMultiNamespace() + n1.Add(tt.args.ns...) + n2 := NewMultiNamespace() + n2.Add(tt.args.ns...) + _, single := n1.Single() + assert.Equal(t, n1.IsAll, false) + assert.Equal(t, n1.namespaces.Len(), 2) + assert.True(t, n1.namespaces.HasAll("ns1", "ns2")) + assert.True(t, n1.Equal(n2)) + assert.False(t, single, false) + }) + } +}