Skip to content

Commit

Permalink
feat: Implementation aggregator apiserver and resourcecache controlle…
Browse files Browse the repository at this point in the history
…r manager

1. aggregator apiserver install resourcecache and proxy api group
2. controller for reconcile resourcecache and handle proxy request

Signed-off-by: chengjin <[email protected]>
  • Loading branch information
JimDevil committed Jan 24, 2025
1 parent a034af0 commit 4a350c5
Show file tree
Hide file tree
Showing 20 changed files with 1,196 additions and 523 deletions.
57 changes: 35 additions & 22 deletions cmd/clusterlink/proxy/app/clusterlink-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ import (
"fmt"

"github.com/spf13/cobra"
genericapiserver "k8s.io/apiserver/pkg/server"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/cmd/clusterlink/proxy/app/options"
"github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// NewClusterLinkProxyCommand creates a *cobra.Command object with default parameters
func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
opts := options.NewOptions()

cmd := &cobra.Command{
Use: "proxy",
Long: `The proxy starts a apiserver for agent access the backend proxy`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
/*
if errs := opts.Validate(); len(errs) != 0 {
return errs.ToAggregate()
}
*/
Use: utils.KosmosClusrerLinkRroxyComponentName,
Long: `starts a server for agent kube-apiserver`,
RunE: func(_ *cobra.Command, _ []string) error {
if err := opts.Validate(); err != nil {
return err
}
return run(ctx, opts)
},
Args: func(cmd *cobra.Command, args []string) error {
Expand All @@ -37,24 +36,26 @@ func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
return nil
},
}
namedFlagSets := opts.Flags()

fs := cmd.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
flags := cmd.Flags()

cols, _, err := term.TerminalSize(cmd.OutOrStdout())
if err != nil {
klog.Warning("term.TerminalSize err: %v", err)
} else {
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
}
fss := cliflag.NamedFlagSets{}
genericFlagSet := fss.FlagSet("generic")
opts.AddFlags(genericFlagSet)

logsFlagSet := fss.FlagSet("logs")
klogflag.Add(logsFlagSet)

flags.AddFlagSet(genericFlagSet)
flags.AddFlagSet(logsFlagSet)

return cmd
}

func run(ctx context.Context, opts *options.Options) error {
// pprof
profileflag.ListenAndServe(opts.ProfileOpts)

config, err := opts.Config()
if err != nil {
return err
Expand All @@ -65,5 +66,17 @@ func run(ctx context.Context, opts *options.Options) error {
return err
}

server.GenericAPIServer.AddPostStartHookOrDie("start-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
go func() {
config.ExtraConfig.ProxyController.Run(context.StopCh, 1)
}()
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("start-apiserver-informer", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KosmosInformerFactory.Start(context.StopCh)
return nil
})

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
206 changes: 98 additions & 108 deletions cmd/clusterlink/proxy/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,81 @@ package options

import (
"fmt"
"log"
"net"
"net/http"
"strings"

"github.com/spf13/pflag"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/endpoints/openapi"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
proxyScheme "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
"github.com/kosmos.io/kosmos/pkg/clusterlink/proxy"
proxyctl "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller"
kosmosclientset "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
generatedopenapi "github.com/kosmos.io/kosmos/pkg/generated/openapi"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// Options contains command line parameters for clusterlink-proxy
type Options struct {
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
utils.KubernetesOptions

Logs *logs.Options
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
FeatureGate featuregate.FeatureGate
Admission *genericoptions.AdmissionOptions
// RecommendedOptions *genericoptions.RecommendedOptions
GenericServerRunOptions *genericoptions.ServerRunOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
ServerRunOptions *genericoptions.ServerRunOptions

ProfileOpts profileflag.Options
}

// nolint
func NewOptions() *Options {
sso := genericoptions.NewSecureServingOptions()

// We are composing recommended options for an aggregated api-server,
// whose client is typically a proxy multiplexing many operations ---
// notably including long-running ones --- into one HTTP/2 connection
// into this server. So allow many concurrent operations.
sso.HTTP2MaxStreamsPerConnection = 1000

return &Options{
MaxRequestsInFlight: 0,
MaxMutatingRequestsInFlight: 0,

Logs: logs.NewOptions(),
SecureServing: sso.WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
FeatureGate: feature.DefaultFeatureGate,
Admission: genericoptions.NewAdmissionOptions(),
}
func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Authorization.AddFlags(flags)
o.Audit.AddFlags(flags)
o.Features.AddFlags(flags)
o.CoreAPI.AddFlags(flags)
o.ServerRunOptions.AddUniversalFlags(flags)
o.ProfileOpts.AddFlags(flags)
}

// nolint
func (o *Options) Validate() error {
errors := []error{}
errors = append(errors, o.validateGenericOptions()...)
return utilerrors.NewAggregate(errors)
}

func (o *Options) validateGenericOptions() []error {
errors := []error{}
if o.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if o.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
func NewOptions() *Options {
o := &Options{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
ServerRunOptions: genericoptions.NewServerRunOptions(),
}

errors = append(errors, o.CoreAPI.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
errors = append(errors, o.Audit.Validate()...)
errors = append(errors, o.Features.Validate()...)
return errors
return o
}

// nolint
func (o *Options) Flags() cliflag.NamedFlagSets {
var fss cliflag.NamedFlagSets

genericfs := fss.FlagSet("generic")
genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+
"Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.")
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+
"this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.")

globalcfs := fss.FlagSet("global")
globalcfs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.")
globalcfs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.")
o.CoreAPI.AddFlags(globalcfs)
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))
o.Audit.AddFlags(fss.FlagSet("auditing"))
o.Features.AddFlags(fss.FlagSet("features"))
logsapi.AddFlags(o.Logs, fss.FlagSet("logs"))

// o.Admission.AddFlags(fss.FlagSet("admission"))
// o.Traces.AddFlags(fss.FlagSet("traces"))

return fss
func (o *Options) Validate() error {
errs := []error{}
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.Authorization.Validate()...)
errs = append(errs, o.Audit.Validate()...)
errs = append(errs, o.Features.Validate()...)
return utilerrors.NewAggregate(errs)
}

// nolint
Expand All @@ -126,52 +89,79 @@ func (o *Options) Config() (*proxy.Config, error) {
return nil, fmt.Errorf("error create self-signed certificates: %v", err)
}

// remove NamespaceLifecycle admission plugin explicitly
// current admission plugins: mutatingwebhook, validatingwebhook
o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)
// o.Admission.DisablePlugins = append(o.RecommendedOptions.Admission.DisablePlugins, lifecycle.PluginName)

genericConfig := genericapiserver.NewRecommendedConfig(proxy.Codecs)
// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion
genericConfig := genericserver.NewRecommendedConfig(proxyScheme.Codecs)
genericConfig.OpenAPIConfig = genericserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(scheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = utils.KosmosClusrerLinkRroxyComponentName
genericConfig.OpenAPIConfig.Info.Version = utils.ClusterLinkOpenAPIVersion

// support watch to LongRunningFunc
genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool {
return strings.Contains(r.RequestURI, "watch")
}

if err := o.genericOptionsApplyTo(genericConfig); err != nil {
if err := o.ApplyTo(genericConfig); err != nil {
return nil, err
}

restMapper, err := apiutil.NewDynamicRESTMapper(genericConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}
kosmosClient := kosmosclientset.NewForConfigOrDie(genericConfig.ClientConfig)
kosmosInformerFactory := informerfactory.NewSharedInformerFactory(kosmosClient, 0)

dynamicClient, err := dynamic.NewForConfig(genericConfig.ClientConfig)
if err != nil {
log.Fatal(err)
}

proxyCtl, err := proxyctl.NewResourceCacheController(proxyctl.NewControllerOption{
RestConfig: genericConfig.ClientConfig,
RestMapper: restMapper,
KosmosFactory: kosmosInformerFactory,
DynamicClient: dynamicClient,
})
if err != nil {
return nil, err
}

return &proxy.Config{
GenericConfig: genericConfig,
ExtraConfig: proxy.ExtraConfig{
ProxyController: proxyCtl,
KosmosInformerFactory: kosmosInformerFactory,
},
}, nil
}

func (o *Options) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error {
config.MaxRequestsInFlight = o.MaxRequestsInFlight
config.MaxMutatingRequestsInFlight = o.MaxMutatingRequestsInFlight

if err := o.SecureServing.ApplyTo(&config.SecureServing, &config.LoopbackClientConfig); err != nil {
func (o *Options) ApplyTo(config *genericserver.RecommendedConfig) error {
if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&config.Authorization); err != nil {
if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
return err
}
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}

if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.CoreAPI.ApplyTo(config); err != nil {
return err
}

utils.SetQPSBurst(config.ClientConfig, o.KubernetesOptions)
return o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate)
if err := o.ServerRunOptions.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 4a350c5

Please sign in to comment.