From 1f6217c0046fc56fc72c743f2672d688f08acac4 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Fri, 7 Feb 2025 14:05:04 -0500 Subject: [PATCH 1/2] enhance: start worker goroutines on demand Instead of having a bunch of waiting goroutines, this change will introduce one goroutine per type that waits for new tasks. That goroutine will spin up new goroutines, up to the desired limit, to handle tasks as they come in. Signed-off-by: Donnie Adams --- pkg/runtime/controller.go | 59 ++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..08bb9a2 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -148,20 +147,12 @@ func (c *controller) run(ctx context.Context, workers int) { c.startLock.Unlock() defer utilruntime.HandleCrash() - defer func() { - c.workqueue.ShutDown() - }() // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,26 +208,44 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + running := make(chan struct{}, workers) -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer func() { + defer wait.Wait() + }() - if shutdown { - return false - } + defer close(running) - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + go func() { + <-ctx.Done() + c.workqueue.ShutDown() + }() + + for { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return } - return true - } - return true + running <- struct{}{} + wait.Add(1) + + go func() { + defer func() { + <-running + wait.Done() + }() + + if err := c.processSingleItem(ctx, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + }() + } } func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error { From 9ff66f3cdd1a40c651b751b6e23646ed821ea3bd Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Mon, 10 Feb 2025 09:16:57 -0500 Subject: [PATCH 2/2] feat: add the ability to use multiple worker queues per GVK A QueueSplitter interface is added that allows the caller to have multiple worker queues per GVK. This allows for a sort of priority-based queuing mechanism. Signed-off-by: Donnie Adams --- pkg/backend/backend.go | 2 +- pkg/router/handler.go | 4 +- pkg/router/trigger.go | 4 +- pkg/runtime/backend.go | 6 +- pkg/runtime/clients.go | 6 +- pkg/runtime/controller.go | 136 ++++++++++++++++--------- pkg/runtime/sharedcontrollerfactory.go | 38 +++---- router.go | 13 ++- 8 files changed, 132 insertions(+), 77 deletions(-) diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index 171b369..74320b7 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -13,7 +13,7 @@ import ( type Callback func(gvk schema.GroupVersionKind, key string, obj runtime.Object) (runtime.Object, error) type Trigger interface { - Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error + Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error } type Watcher interface { diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 9e9ca87..625d459 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -233,7 +233,7 @@ func (m *HandlerSet) checkDelay(gvk schema.GroupVersionKind, key string) bool { m.limiterLock.Lock() defer m.limiterLock.Unlock() delete(m.waiting, lKey) - _ = m.backend.Trigger(gvk, ReplayPrefix+key, 0) + _ = m.backend.Trigger(m.ctx, gvk, ReplayPrefix+key, 0) }() return false } @@ -341,7 +341,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO req.Object = newObj if resp.delay > 0 { - if err := m.backend.Trigger(gvk, key, resp.delay); err != nil { + if err := m.backend.Trigger(m.ctx, gvk, key, resp.delay); err != nil { return nil, err } } diff --git a/pkg/router/trigger.go b/pkg/router/trigger.go index 8665332..a60e125 100644 --- a/pkg/router/trigger.go +++ b/pkg/router/trigger.go @@ -44,7 +44,7 @@ func (m *triggers) invokeTriggers(req Request) { for _, matcher := range matchers { if matcher.Match(req.Namespace, req.Name, req.Object) { log.Debugf("Triggering [%s] [%v] from [%s] [%v]", et.key, et.gvk, req.Key, req.GVK) - _ = m.trigger.Trigger(et.gvk, et.key, 0) + _ = m.trigger.Trigger(req.Ctx, et.gvk, et.key, 0) break } } @@ -133,7 +133,7 @@ func (m *triggers) UnregisterAndTrigger(req Request) { } if targetGVK == req.GVK && mt.Match(req.Namespace, req.Name, req.Object) { log.Debugf("Triggering [%s] [%v] from [%s] [%v] on delete", target.key, target.gvk, req.Key, req.GVK) - _ = m.trigger.Trigger(target.gvk, target.key, 0) + _ = m.trigger.Trigger(req.Ctx, target.gvk, target.key, 0) } } } diff --git a/pkg/runtime/backend.go b/pkg/runtime/backend.go index 86b9895..1f6a7fc 100644 --- a/pkg/runtime/backend.go +++ b/pkg/runtime/backend.go @@ -82,8 +82,8 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) { return nil } -func (b *Backend) Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error { - controller, err := b.cacheFactory.ForKind(gvk) +func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error { + controller, err := b.cacheFactory.ForKind(ctx, gvk) if err != nil { return err } @@ -138,7 +138,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e } func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error { - c, err := b.cacheFactory.ForKind(gvk) + c, err := b.cacheFactory.ForKind(ctx, gvk) if err != nil { return err } diff --git a/pkg/runtime/clients.go b/pkg/runtime/clients.go index 34165d4..1c20d3b 100644 --- a/pkg/runtime/clients.go +++ b/pkg/runtime/clients.go @@ -19,7 +19,8 @@ type Runtime struct { type Config struct { GroupConfig - GVKThreadiness map[schema.GroupVersionKind]int + GVKThreadiness map[schema.GroupVersionKind]int + GVKQueueSplitters map[schema.GroupVersionKind]WorkerQueueSplitter } type GroupConfig struct { @@ -69,7 +70,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Grou aggCache := multi.NewCache(scheme, theCache, caches) factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{ - KindWorkers: defaultConfig.GVKThreadiness, + KindWorkers: defaultConfig.GVKThreadiness, + KindQueueSplitter: defaultConfig.GVKQueueSplitters, // In nah this is only invoked when a key fails to process DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter( // This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 08bb9a2..f84ab82 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -48,7 +48,7 @@ type controller struct { startLock sync.Mutex name string - workqueue workqueue.TypedRateLimitingInterface[any] + workqueues []workqueue.TypedRateLimitingInterface[any] rateLimiter workqueue.TypedRateLimiter[any] informer cache.Informer handler Handler @@ -58,6 +58,7 @@ type controller struct { registration clientgocache.ResourceEventHandlerRegistration obj runtime.Object cache cache.Cache + splitter WorkerQueueSplitter } type startKey struct { @@ -66,10 +67,26 @@ type startKey struct { } type Options struct { - RateLimiter workqueue.TypedRateLimiter[any] + RateLimiter workqueue.TypedRateLimiter[any] + QueueSplitter WorkerQueueSplitter } -func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) { +type WorkerQueueSplitter interface { + Queues() int + Split(key string) int +} + +type singleWorkerQueueSplitter struct{} + +func (*singleWorkerQueueSplitter) Queues() int { + return 1 +} + +func (*singleWorkerQueueSplitter) Split(string) int { + return 0 +} + +func New(ctx context.Context, gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) { opts = applyDefaultOptions(opts) obj, err := newObject(scheme, gvk) @@ -77,7 +94,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, return nil, err } - informer, err := cache.GetInformerForKind(context.TODO(), gvk) + informer, err := cache.GetInformerForKind(ctx, gvk) if err != nil { return nil, err } @@ -90,6 +107,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, obj: obj, rateLimiter: opts.RateLimiter, informer: informer, + splitter: opts.QueueSplitter, } return controller, nil @@ -114,6 +132,9 @@ func applyDefaultOptions(opts *Options) *Options { workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 30*time.Second), ) } + if newOpts.QueueSplitter == nil { + newOpts.QueueSplitter = (*singleWorkerQueueSplitter)(nil) + } return &newOpts } @@ -135,12 +156,15 @@ func (c *controller) run(ctx context.Context, workers int) { // will create a goroutine under the hood. It we instantiate a workqueue we must have // a mechanism to Shutdown it down. Without the stopCh we don't know when to shutdown // the queue and release the goroutine - c.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: c.name}) + c.workqueues = make([]workqueue.TypedRateLimitingInterface[any], c.splitter.Queues()) + for i := range c.workqueues { + c.workqueues[i] = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: fmt.Sprintf("%s-%d", c.name, i)}) + } for _, start := range c.startKeys { if start.after == 0 { - c.workqueue.Add(start.key) + c.workqueues[c.splitter.Split(start.key)].Add(start.key) } else { - c.workqueue.AddAfter(start.key, start.after) + c.workqueues[c.splitter.Split(start.key)].AddAfter(start.key, start.after) } } c.startKeys = nil @@ -210,63 +234,75 @@ func (c *controller) Start(ctx context.Context, workers int) error { func (c *controller) runWorkers(ctx context.Context, workers int) { wait := sync.WaitGroup{} - running := make(chan struct{}, workers) + workers = workers / len(c.workqueues) + if workers == 0 { + workers = 1 + } defer func() { defer wait.Wait() }() - defer close(running) - - go func() { - <-ctx.Done() - c.workqueue.ShutDown() - }() - - for { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return - } + for _, queue := range c.workqueues { + go func() { + // This channel acts as a semaphore to limit the number of concurrent + // work items handled by this controller. + running := make(chan struct{}, workers) + defer close(running) - running <- struct{}{} - wait.Add(1) + for { + obj, shutdown := queue.Get() - go func() { - defer func() { - <-running - wait.Done() - }() - - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + if shutdown { + return } + + // Acquire from the semaphore + running <- struct{}{} + wait.Add(1) + + go func() { + defer func() { + // Release to the semaphore + <-running + wait.Done() + }() + + if err := c.processSingleItem(ctx, queue, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + }() } }() } + + <-ctx.Done() + for i := range c.workqueues { + c.workqueues[i].ShutDown() + } } -func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error { +func (c *controller) processSingleItem(ctx context.Context, queue workqueue.TypedRateLimitingInterface[any], obj interface{}) error { var ( key string ok bool ) - defer c.workqueue.Done(obj) + defer queue.Done(obj) if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) + queue.Forget(obj) log.Errorf("expected string in workqueue but got %#v", obj) return nil } if err := c.syncHandler(ctx, key); err != nil { - c.workqueue.AddRateLimited(key) + queue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } - c.workqueue.Forget(obj) + queue.Forget(obj) return nil } @@ -280,7 +316,7 @@ func (c *controller) syncHandler(ctx context.Context, key string) error { return c.handler.OnChange(key, nil) } - ns, name := keyParse(key) + ns, name := KeyParse(key) obj := c.obj.DeepCopyObject().(kclient.Object) err := c.cache.Get(ctx, kclient.ObjectKey{ Name: name, @@ -299,10 +335,10 @@ func (c *controller) EnqueueKey(key string) { c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.Add(key) + c.workqueues[c.splitter.Split(key)].Add(key) } } @@ -312,10 +348,10 @@ func (c *controller) Enqueue(namespace, name string) { c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.AddRateLimited(key) + c.workqueues[c.splitter.Split(key)].AddRateLimited(key) } } @@ -325,15 +361,19 @@ func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key, after: duration}) } else { - c.workqueue.AddAfter(key, duration) + c.workqueues[c.splitter.Split(key)].AddAfter(key, duration) } } -func keyParse(key string) (namespace string, name string) { - var ok bool +func KeyParse(key string) (namespace string, name string) { + special, key, ok := strings.Cut(key, " ") + if !ok { + key = special + } + namespace, name, ok = strings.Cut(key, "/") if !ok { name = namespace @@ -357,10 +397,10 @@ func (c *controller) enqueue(obj interface{}) { return } c.startLock.Lock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.Add(key) + c.workqueues[c.splitter.Split(key)].Add(key) } c.startLock.Unlock() } diff --git a/pkg/runtime/sharedcontrollerfactory.go b/pkg/runtime/sharedcontrollerfactory.go index 32b0f9b..b57e08c 100644 --- a/pkg/runtime/sharedcontrollerfactory.go +++ b/pkg/runtime/sharedcontrollerfactory.go @@ -11,7 +11,7 @@ import ( ) type SharedControllerFactory interface { - ForKind(gvk schema.GroupVersionKind) (SharedController, error) + ForKind(ctx context.Context, gvk schema.GroupVersionKind) (SharedController, error) Preload(ctx context.Context) error Start(ctx context.Context) error } @@ -20,8 +20,9 @@ type SharedControllerFactoryOptions struct { DefaultRateLimiter workqueue.TypedRateLimiter[any] DefaultWorkers int - KindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] - KindWorkers map[schema.GroupVersionKind]int + KindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] + KindWorkers map[schema.GroupVersionKind]int + KindQueueSplitter map[schema.GroupVersionKind]WorkerQueueSplitter } type sharedControllerFactory struct { @@ -33,22 +34,24 @@ type sharedControllerFactory struct { client kclient.Client controllers map[schema.GroupVersionKind]*sharedController - rateLimiter workqueue.TypedRateLimiter[any] - workers int - kindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] - kindWorkers map[schema.GroupVersionKind]int + rateLimiter workqueue.TypedRateLimiter[any] + workers int + kindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] + kindWorkers map[schema.GroupVersionKind]int + kindQueueSplitter map[schema.GroupVersionKind]WorkerQueueSplitter } func NewSharedControllerFactory(c kclient.Client, cache cache.Cache, opts *SharedControllerFactoryOptions) SharedControllerFactory { opts = applyDefaultSharedOptions(opts) return &sharedControllerFactory{ - cache: cache, - client: c, - controllers: map[schema.GroupVersionKind]*sharedController{}, - workers: opts.DefaultWorkers, - kindWorkers: opts.KindWorkers, - rateLimiter: opts.DefaultRateLimiter, - kindRateLimiter: opts.KindRateLimiter, + cache: cache, + client: c, + controllers: map[schema.GroupVersionKind]*sharedController{}, + workers: opts.DefaultWorkers, + kindWorkers: opts.KindWorkers, + rateLimiter: opts.DefaultRateLimiter, + kindRateLimiter: opts.KindRateLimiter, + kindQueueSplitter: opts.KindQueueSplitter, } } @@ -114,7 +117,7 @@ func (s *sharedControllerFactory) loadAndStart(ctx context.Context, start bool) return nil } -func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedController, error) { +func (s *sharedControllerFactory) ForKind(ctx context.Context, gvk schema.GroupVersionKind) (SharedController, error) { controllerResult := s.byGVK(gvk) if controllerResult != nil { return controllerResult, nil @@ -137,8 +140,9 @@ func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedCo rateLimiter = s.rateLimiter } - return New(gvk, s.client.Scheme(), s.cache, handler, &Options{ - RateLimiter: rateLimiter, + return New(ctx, gvk, s.client.Scheme(), s.cache, handler, &Options{ + RateLimiter: rateLimiter, + QueueSplitter: s.kindQueueSplitter[gvk], }) }, handler: handler, diff --git a/router.go b/router.go index 7b5fdba..ab53120 100644 --- a/router.go +++ b/router.go @@ -33,6 +33,8 @@ type Options struct { HealthzPort int // Change the threadedness per GVK GVKThreadiness map[schema.GroupVersionKind]int + // Split the worker queues for these GVKs + GVKQueueSplitters map[schema.GroupVersionKind]nruntime.WorkerQueueSplitter } func (o *Options) complete() (*Options, error) { @@ -61,7 +63,14 @@ func (o *Options) complete() (*Options, error) { } } - defaultConfig := nruntime.Config{GroupConfig: nruntime.GroupConfig{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}, GVKThreadiness: result.GVKThreadiness} + defaultConfig := nruntime.Config{ + GroupConfig: nruntime.GroupConfig{ + Rest: result.DefaultRESTConfig, + Namespace: result.DefaultNamespace, + }, + GVKThreadiness: result.GVKThreadiness, + GVKQueueSplitters: result.GVKQueueSplitters, + } backend, err := nruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme) if err != nil { return nil, err @@ -93,7 +102,7 @@ func DefaultOptions(routerName string, scheme *runtime.Scheme) (*Options, error) } // DefaultRouter The routerName is important as this name will be used to assign ownership of objects created by this -// router. Specifically the routerName is assigned to the sub-context in the apply actions. Additionally, the routerName +// router. Specifically, the routerName is assigned to the sub-context in the apply actions. Additionally, the routerName // will be used for the leader election lease lock. func DefaultRouter(routerName string, scheme *runtime.Scheme) (*router.Router, error) { opts, err := DefaultOptions(routerName, scheme)