Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clusterlink-proxy aggregator apiserver provider resource cache and proxy services #814

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions cmd/clusterlink/proxy/app/clusterlink-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ import (
"fmt"

"github.com/spf13/cobra"
genericapiserver "k8s.io/apiserver/pkg/server"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/cmd/clusterlink/proxy/app/options"
"github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// NewClusterLinkProxyCommand creates a *cobra.Command object with default parameters
func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
opts := options.NewOptions()

cmd := &cobra.Command{
Use: "proxy",
Long: `The proxy starts a apiserver for agent access the backend proxy`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
/*
if errs := opts.Validate(); len(errs) != 0 {
return errs.ToAggregate()
}
*/
Use: utils.KosmosClusrerLinkRroxyComponentName,
Long: `starts a server for agent kube-apiserver`,
RunE: func(_ *cobra.Command, _ []string) error {
if err := opts.Validate(); err != nil {
return err
}
return run(ctx, opts)
},
Args: func(cmd *cobra.Command, args []string) error {
Expand All @@ -37,24 +36,26 @@ func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
return nil
},
}
namedFlagSets := opts.Flags()

fs := cmd.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
flags := cmd.Flags()

cols, _, err := term.TerminalSize(cmd.OutOrStdout())
if err != nil {
klog.Warning("term.TerminalSize err: %v", err)
} else {
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
}
fss := cliflag.NamedFlagSets{}
genericFlagSet := fss.FlagSet("generic")
opts.AddFlags(genericFlagSet)

logsFlagSet := fss.FlagSet("logs")
klogflag.Add(logsFlagSet)

flags.AddFlagSet(genericFlagSet)
flags.AddFlagSet(logsFlagSet)

return cmd
}

func run(ctx context.Context, opts *options.Options) error {
// pprof
profileflag.ListenAndServe(opts.ProfileOpts)

config, err := opts.Config()
if err != nil {
return err
Expand All @@ -65,5 +66,17 @@ func run(ctx context.Context, opts *options.Options) error {
return err
}

server.GenericAPIServer.AddPostStartHookOrDie("start-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
go func() {
config.ExtraConfig.ProxyController.Run(context.StopCh, 1)
}()
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("start-apiserver-informer", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KosmosInformerFactory.Start(context.StopCh)
return nil
})

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
206 changes: 98 additions & 108 deletions cmd/clusterlink/proxy/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,81 @@ package options

import (
"fmt"
"log"
"net"
"net/http"
"strings"

"github.com/spf13/pflag"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/endpoints/openapi"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
proxyScheme "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
"github.com/kosmos.io/kosmos/pkg/clusterlink/proxy"
proxyctl "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller"
kosmosclientset "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
generatedopenapi "github.com/kosmos.io/kosmos/pkg/generated/openapi"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// Options contains command line parameters for clusterlink-proxy
type Options struct {
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
utils.KubernetesOptions

Logs *logs.Options
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
FeatureGate featuregate.FeatureGate
Admission *genericoptions.AdmissionOptions
// RecommendedOptions *genericoptions.RecommendedOptions
GenericServerRunOptions *genericoptions.ServerRunOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
ServerRunOptions *genericoptions.ServerRunOptions

ProfileOpts profileflag.Options
}

// nolint
func NewOptions() *Options {
sso := genericoptions.NewSecureServingOptions()

// We are composing recommended options for an aggregated api-server,
// whose client is typically a proxy multiplexing many operations ---
// notably including long-running ones --- into one HTTP/2 connection
// into this server. So allow many concurrent operations.
sso.HTTP2MaxStreamsPerConnection = 1000

return &Options{
MaxRequestsInFlight: 0,
MaxMutatingRequestsInFlight: 0,

Logs: logs.NewOptions(),
SecureServing: sso.WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
FeatureGate: feature.DefaultFeatureGate,
Admission: genericoptions.NewAdmissionOptions(),
}
func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Authorization.AddFlags(flags)
o.Audit.AddFlags(flags)
o.Features.AddFlags(flags)
o.CoreAPI.AddFlags(flags)
o.ServerRunOptions.AddUniversalFlags(flags)
o.ProfileOpts.AddFlags(flags)
}

// nolint
func (o *Options) Validate() error {
errors := []error{}
errors = append(errors, o.validateGenericOptions()...)
return utilerrors.NewAggregate(errors)
}

func (o *Options) validateGenericOptions() []error {
errors := []error{}
if o.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if o.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
func NewOptions() *Options {
o := &Options{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
ServerRunOptions: genericoptions.NewServerRunOptions(),
}

errors = append(errors, o.CoreAPI.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
errors = append(errors, o.Audit.Validate()...)
errors = append(errors, o.Features.Validate()...)
return errors
return o
}

// nolint
func (o *Options) Flags() cliflag.NamedFlagSets {
var fss cliflag.NamedFlagSets

genericfs := fss.FlagSet("generic")
genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+
"Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.")
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+
"this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.")

globalcfs := fss.FlagSet("global")
globalcfs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.")
globalcfs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.")
o.CoreAPI.AddFlags(globalcfs)
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))
o.Audit.AddFlags(fss.FlagSet("auditing"))
o.Features.AddFlags(fss.FlagSet("features"))
logsapi.AddFlags(o.Logs, fss.FlagSet("logs"))

// o.Admission.AddFlags(fss.FlagSet("admission"))
// o.Traces.AddFlags(fss.FlagSet("traces"))

return fss
func (o *Options) Validate() error {
errs := []error{}
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.Authorization.Validate()...)
errs = append(errs, o.Audit.Validate()...)
errs = append(errs, o.Features.Validate()...)
return utilerrors.NewAggregate(errs)
}

// nolint
Expand All @@ -126,52 +89,79 @@ func (o *Options) Config() (*proxy.Config, error) {
return nil, fmt.Errorf("error create self-signed certificates: %v", err)
}

// remove NamespaceLifecycle admission plugin explicitly
// current admission plugins: mutatingwebhook, validatingwebhook
o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)
// o.Admission.DisablePlugins = append(o.RecommendedOptions.Admission.DisablePlugins, lifecycle.PluginName)

genericConfig := genericapiserver.NewRecommendedConfig(proxy.Codecs)
// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion
genericConfig := genericserver.NewRecommendedConfig(proxyScheme.Codecs)
genericConfig.OpenAPIConfig = genericserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(scheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = utils.KosmosClusrerLinkRroxyComponentName
genericConfig.OpenAPIConfig.Info.Version = utils.ClusterLinkOpenAPIVersion

// support watch to LongRunningFunc
genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool {
return strings.Contains(r.RequestURI, "watch")
}

if err := o.genericOptionsApplyTo(genericConfig); err != nil {
if err := o.ApplyTo(genericConfig); err != nil {
return nil, err
}

restMapper, err := apiutil.NewDynamicRESTMapper(genericConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}
kosmosClient := kosmosclientset.NewForConfigOrDie(genericConfig.ClientConfig)
kosmosInformerFactory := informerfactory.NewSharedInformerFactory(kosmosClient, 0)

dynamicClient, err := dynamic.NewForConfig(genericConfig.ClientConfig)
if err != nil {
log.Fatal(err)
}

proxyCtl, err := proxyctl.NewResourceCacheController(proxyctl.NewControllerOption{
RestConfig: genericConfig.ClientConfig,
RestMapper: restMapper,
KosmosFactory: kosmosInformerFactory,
DynamicClient: dynamicClient,
})
if err != nil {
return nil, err
}

return &proxy.Config{
GenericConfig: genericConfig,
ExtraConfig: proxy.ExtraConfig{
ProxyController: proxyCtl,
KosmosInformerFactory: kosmosInformerFactory,
},
}, nil
}

func (o *Options) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error {
config.MaxRequestsInFlight = o.MaxRequestsInFlight
config.MaxMutatingRequestsInFlight = o.MaxMutatingRequestsInFlight

if err := o.SecureServing.ApplyTo(&config.SecureServing, &config.LoopbackClientConfig); err != nil {
func (o *Options) ApplyTo(config *genericserver.RecommendedConfig) error {
if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&config.Authorization); err != nil {
if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
return err
}
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}

if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.CoreAPI.ApplyTo(config); err != nil {
return err
}

utils.SetQPSBurst(config.ClientConfig, o.KubernetesOptions)
return o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate)
if err := o.ServerRunOptions.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
return nil
}
Loading