diff --git a/cmd/yarn-operator/main.go b/cmd/yarn-operator/main.go index 4ca4b65e..573baa1c 100644 --- a/cmd/yarn-operator/main.go +++ b/cmd/yarn-operator/main.go @@ -20,15 +20,19 @@ import ( "flag" "math/rand" "net/http" + _ "net/http/pprof" "os" "time" "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" utilclient "github.com/koordinator-sh/koordinator/pkg/util/client" "github.com/koordinator-sh/koordinator/pkg/util/fieldindex" @@ -49,6 +53,7 @@ func main() { var leaderElectionNamespace string var namespace string var syncPeriodStr string + var cacheSelectorKey, cacheSelectorValue, cacheSelectorNamespace string flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&healthProbeAddr, "health-probe-addr", ":8000", "The address the healthz/readyz endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", true, "Whether you need to enable leader election.") @@ -59,6 +64,13 @@ func main() { flag.BoolVar(&enablePprof, "enable-pprof", true, "Enable pprof for controller manager.") flag.StringVar(&pprofAddr, "pprof-addr", ":8090", "The address the pprof binds to.") flag.StringVar(&syncPeriodStr, "sync-period", "", "Determines the minimum frequency at which watched resources are reconciled.") + // By setting the following three parameters, the size of the field index cache can be effectively reduced, + // thereby reducing the memory occupied by the koord-yarn-operator. + // Warning: three parameters must be set at the same time to take effect. + flag.StringVar(&cacheSelectorKey, "cache-selector-key", "app", "The cacheSelectorLabel should be the key that marks the label of the NMpod.") + flag.StringVar(&cacheSelectorValue, "cache-selector-value", "hadoop-yarn", "The cacheSelectorValue should be the value that marks the label of the NMpod.") + flag.StringVar(&cacheSelectorNamespace, "cache-selector-namespace", "yarn-k8s-elastic", "The cacheSelectorNamespace should be the namespace of NMpod.") + opts := options.NewOptions() opts.InitFlags(flag.CommandLine) //sloconfig.InitFlags(flag.CommandLine) @@ -98,6 +110,21 @@ func main() { syncPeriod = &d } } + + // To reduce the memory size of the field index, we created a new cache that filters pods + // see: https://github.com/kubernetes-sigs/controller-runtime/pull/1419/files + var newCache cache.NewCacheFunc + if cacheSelectorKey != "" && cacheSelectorValue != "" && cacheSelectorNamespace != "" { + newCache = cache.BuilderWithOptions(cache.Options{ + SelectorsByObject: cache.SelectorsByObject{ + &corev1.Pod{}: { + Label: labels.SelectorFromSet(labels.Set{cacheSelectorKey: cacheSelectorValue}), + }, + }, + Namespace: cacheSelectorNamespace, + }) + } + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: options.Scheme, MetricsBindAddress: metricsAddr, @@ -107,6 +134,7 @@ func main() { LeaderElectionNamespace: leaderElectionNamespace, LeaderElectionResourceLock: resourcelock.ConfigMapsLeasesResourceLock, Namespace: namespace, + NewCache: newCache, SyncPeriod: syncPeriod, NewClient: utilclient.NewClient, })