diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index 171b369..74320b7 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -13,7 +13,7 @@ import ( type Callback func(gvk schema.GroupVersionKind, key string, obj runtime.Object) (runtime.Object, error) type Trigger interface { - Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error + Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error } type Watcher interface { diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 9e9ca87..625d459 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -233,7 +233,7 @@ func (m *HandlerSet) checkDelay(gvk schema.GroupVersionKind, key string) bool { m.limiterLock.Lock() defer m.limiterLock.Unlock() delete(m.waiting, lKey) - _ = m.backend.Trigger(gvk, ReplayPrefix+key, 0) + _ = m.backend.Trigger(m.ctx, gvk, ReplayPrefix+key, 0) }() return false } @@ -341,7 +341,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO req.Object = newObj if resp.delay > 0 { - if err := m.backend.Trigger(gvk, key, resp.delay); err != nil { + if err := m.backend.Trigger(m.ctx, gvk, key, resp.delay); err != nil { return nil, err } } diff --git a/pkg/router/trigger.go b/pkg/router/trigger.go index 8665332..a60e125 100644 --- a/pkg/router/trigger.go +++ b/pkg/router/trigger.go @@ -44,7 +44,7 @@ func (m *triggers) invokeTriggers(req Request) { for _, matcher := range matchers { if matcher.Match(req.Namespace, req.Name, req.Object) { log.Debugf("Triggering [%s] [%v] from [%s] [%v]", et.key, et.gvk, req.Key, req.GVK) - _ = m.trigger.Trigger(et.gvk, et.key, 0) + _ = m.trigger.Trigger(req.Ctx, et.gvk, et.key, 0) break } } @@ -133,7 +133,7 @@ func (m *triggers) UnregisterAndTrigger(req Request) { } if targetGVK == req.GVK && mt.Match(req.Namespace, req.Name, req.Object) { log.Debugf("Triggering [%s] [%v] from [%s] [%v] on delete", target.key, target.gvk, req.Key, req.GVK) - _ = m.trigger.Trigger(target.gvk, target.key, 0) + _ = m.trigger.Trigger(req.Ctx, target.gvk, target.key, 0) } } } diff --git a/pkg/runtime/backend.go b/pkg/runtime/backend.go index 86b9895..1f6a7fc 100644 --- a/pkg/runtime/backend.go +++ b/pkg/runtime/backend.go @@ -82,8 +82,8 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) { return nil } -func (b *Backend) Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error { - controller, err := b.cacheFactory.ForKind(gvk) +func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error { + controller, err := b.cacheFactory.ForKind(ctx, gvk) if err != nil { return err } @@ -138,7 +138,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e } func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error { - c, err := b.cacheFactory.ForKind(gvk) + c, err := b.cacheFactory.ForKind(ctx, gvk) if err != nil { return err } diff --git a/pkg/runtime/clients.go b/pkg/runtime/clients.go index 34165d4..1c20d3b 100644 --- a/pkg/runtime/clients.go +++ b/pkg/runtime/clients.go @@ -19,7 +19,8 @@ type Runtime struct { type Config struct { GroupConfig - GVKThreadiness map[schema.GroupVersionKind]int + GVKThreadiness map[schema.GroupVersionKind]int + GVKQueueSplitters map[schema.GroupVersionKind]WorkerQueueSplitter } type GroupConfig struct { @@ -69,7 +70,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Grou aggCache := multi.NewCache(scheme, theCache, caches) factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{ - KindWorkers: defaultConfig.GVKThreadiness, + KindWorkers: defaultConfig.GVKThreadiness, + KindQueueSplitter: defaultConfig.GVKQueueSplitters, // In nah this is only invoked when a key fails to process DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter( // This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..f84ab82 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -49,7 +48,7 @@ type controller struct { startLock sync.Mutex name string - workqueue workqueue.TypedRateLimitingInterface[any] + workqueues []workqueue.TypedRateLimitingInterface[any] rateLimiter workqueue.TypedRateLimiter[any] informer cache.Informer handler Handler @@ -59,6 +58,7 @@ type controller struct { registration clientgocache.ResourceEventHandlerRegistration obj runtime.Object cache cache.Cache + splitter WorkerQueueSplitter } type startKey struct { @@ -67,10 +67,26 @@ type startKey struct { } type Options struct { - RateLimiter workqueue.TypedRateLimiter[any] + RateLimiter workqueue.TypedRateLimiter[any] + QueueSplitter WorkerQueueSplitter } -func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) { +type WorkerQueueSplitter interface { + Queues() int + Split(key string) int +} + +type singleWorkerQueueSplitter struct{} + +func (*singleWorkerQueueSplitter) Queues() int { + return 1 +} + +func (*singleWorkerQueueSplitter) Split(string) int { + return 0 +} + +func New(ctx context.Context, gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) { opts = applyDefaultOptions(opts) obj, err := newObject(scheme, gvk) @@ -78,7 +94,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, return nil, err } - informer, err := cache.GetInformerForKind(context.TODO(), gvk) + informer, err := cache.GetInformerForKind(ctx, gvk) if err != nil { return nil, err } @@ -91,6 +107,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, obj: obj, rateLimiter: opts.RateLimiter, informer: informer, + splitter: opts.QueueSplitter, } return controller, nil @@ -115,6 +132,9 @@ func applyDefaultOptions(opts *Options) *Options { workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 30*time.Second), ) } + if newOpts.QueueSplitter == nil { + newOpts.QueueSplitter = (*singleWorkerQueueSplitter)(nil) + } return &newOpts } @@ -136,32 +156,27 @@ func (c *controller) run(ctx context.Context, workers int) { // will create a goroutine under the hood. It we instantiate a workqueue we must have // a mechanism to Shutdown it down. Without the stopCh we don't know when to shutdown // the queue and release the goroutine - c.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: c.name}) + c.workqueues = make([]workqueue.TypedRateLimitingInterface[any], c.splitter.Queues()) + for i := range c.workqueues { + c.workqueues[i] = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: fmt.Sprintf("%s-%d", c.name, i)}) + } for _, start := range c.startKeys { if start.after == 0 { - c.workqueue.Add(start.key) + c.workqueues[c.splitter.Split(start.key)].Add(start.key) } else { - c.workqueue.AddAfter(start.key, start.after) + c.workqueues[c.splitter.Split(start.key)].AddAfter(start.key, start.after) } } c.startKeys = nil c.startLock.Unlock() defer utilruntime.HandleCrash() - defer func() { - c.workqueue.ShutDown() - }() // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,47 +232,77 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + workers = workers / len(c.workqueues) + if workers == 0 { + workers = 1 } -} -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer func() { + defer wait.Wait() + }() - if shutdown { - return false + for _, queue := range c.workqueues { + go func() { + // This channel acts as a semaphore to limit the number of concurrent + // work items handled by this controller. + running := make(chan struct{}, workers) + defer close(running) + + for { + obj, shutdown := queue.Get() + + if shutdown { + return + } + + // Acquire from the semaphore + running <- struct{}{} + wait.Add(1) + + go func() { + defer func() { + // Release to the semaphore + <-running + wait.Done() + }() + + if err := c.processSingleItem(ctx, queue, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + }() + } + }() } - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) - } - return true + <-ctx.Done() + for i := range c.workqueues { + c.workqueues[i].ShutDown() } - - return true } -func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error { +func (c *controller) processSingleItem(ctx context.Context, queue workqueue.TypedRateLimitingInterface[any], obj interface{}) error { var ( key string ok bool ) - defer c.workqueue.Done(obj) + defer queue.Done(obj) if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) + queue.Forget(obj) log.Errorf("expected string in workqueue but got %#v", obj) return nil } if err := c.syncHandler(ctx, key); err != nil { - c.workqueue.AddRateLimited(key) + queue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } - c.workqueue.Forget(obj) + queue.Forget(obj) return nil } @@ -271,7 +316,7 @@ func (c *controller) syncHandler(ctx context.Context, key string) error { return c.handler.OnChange(key, nil) } - ns, name := keyParse(key) + ns, name := KeyParse(key) obj := c.obj.DeepCopyObject().(kclient.Object) err := c.cache.Get(ctx, kclient.ObjectKey{ Name: name, @@ -290,10 +335,10 @@ func (c *controller) EnqueueKey(key string) { c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.Add(key) + c.workqueues[c.splitter.Split(key)].Add(key) } } @@ -303,10 +348,10 @@ func (c *controller) Enqueue(namespace, name string) { c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.AddRateLimited(key) + c.workqueues[c.splitter.Split(key)].AddRateLimited(key) } } @@ -316,15 +361,19 @@ func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration c.startLock.Lock() defer c.startLock.Unlock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key, after: duration}) } else { - c.workqueue.AddAfter(key, duration) + c.workqueues[c.splitter.Split(key)].AddAfter(key, duration) } } -func keyParse(key string) (namespace string, name string) { - var ok bool +func KeyParse(key string) (namespace string, name string) { + special, key, ok := strings.Cut(key, " ") + if !ok { + key = special + } + namespace, name, ok = strings.Cut(key, "/") if !ok { name = namespace @@ -348,10 +397,10 @@ func (c *controller) enqueue(obj interface{}) { return } c.startLock.Lock() - if c.workqueue == nil { + if c.workqueues == nil { c.startKeys = append(c.startKeys, startKey{key: key}) } else { - c.workqueue.Add(key) + c.workqueues[c.splitter.Split(key)].Add(key) } c.startLock.Unlock() } diff --git a/pkg/runtime/sharedcontrollerfactory.go b/pkg/runtime/sharedcontrollerfactory.go index 32b0f9b..b57e08c 100644 --- a/pkg/runtime/sharedcontrollerfactory.go +++ b/pkg/runtime/sharedcontrollerfactory.go @@ -11,7 +11,7 @@ import ( ) type SharedControllerFactory interface { - ForKind(gvk schema.GroupVersionKind) (SharedController, error) + ForKind(ctx context.Context, gvk schema.GroupVersionKind) (SharedController, error) Preload(ctx context.Context) error Start(ctx context.Context) error } @@ -20,8 +20,9 @@ type SharedControllerFactoryOptions struct { DefaultRateLimiter workqueue.TypedRateLimiter[any] DefaultWorkers int - KindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] - KindWorkers map[schema.GroupVersionKind]int + KindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] + KindWorkers map[schema.GroupVersionKind]int + KindQueueSplitter map[schema.GroupVersionKind]WorkerQueueSplitter } type sharedControllerFactory struct { @@ -33,22 +34,24 @@ type sharedControllerFactory struct { client kclient.Client controllers map[schema.GroupVersionKind]*sharedController - rateLimiter workqueue.TypedRateLimiter[any] - workers int - kindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] - kindWorkers map[schema.GroupVersionKind]int + rateLimiter workqueue.TypedRateLimiter[any] + workers int + kindRateLimiter map[schema.GroupVersionKind]workqueue.TypedRateLimiter[any] + kindWorkers map[schema.GroupVersionKind]int + kindQueueSplitter map[schema.GroupVersionKind]WorkerQueueSplitter } func NewSharedControllerFactory(c kclient.Client, cache cache.Cache, opts *SharedControllerFactoryOptions) SharedControllerFactory { opts = applyDefaultSharedOptions(opts) return &sharedControllerFactory{ - cache: cache, - client: c, - controllers: map[schema.GroupVersionKind]*sharedController{}, - workers: opts.DefaultWorkers, - kindWorkers: opts.KindWorkers, - rateLimiter: opts.DefaultRateLimiter, - kindRateLimiter: opts.KindRateLimiter, + cache: cache, + client: c, + controllers: map[schema.GroupVersionKind]*sharedController{}, + workers: opts.DefaultWorkers, + kindWorkers: opts.KindWorkers, + rateLimiter: opts.DefaultRateLimiter, + kindRateLimiter: opts.KindRateLimiter, + kindQueueSplitter: opts.KindQueueSplitter, } } @@ -114,7 +117,7 @@ func (s *sharedControllerFactory) loadAndStart(ctx context.Context, start bool) return nil } -func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedController, error) { +func (s *sharedControllerFactory) ForKind(ctx context.Context, gvk schema.GroupVersionKind) (SharedController, error) { controllerResult := s.byGVK(gvk) if controllerResult != nil { return controllerResult, nil @@ -137,8 +140,9 @@ func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedCo rateLimiter = s.rateLimiter } - return New(gvk, s.client.Scheme(), s.cache, handler, &Options{ - RateLimiter: rateLimiter, + return New(ctx, gvk, s.client.Scheme(), s.cache, handler, &Options{ + RateLimiter: rateLimiter, + QueueSplitter: s.kindQueueSplitter[gvk], }) }, handler: handler, diff --git a/router.go b/router.go index 7b5fdba..ab53120 100644 --- a/router.go +++ b/router.go @@ -33,6 +33,8 @@ type Options struct { HealthzPort int // Change the threadedness per GVK GVKThreadiness map[schema.GroupVersionKind]int + // Split the worker queues for these GVKs + GVKQueueSplitters map[schema.GroupVersionKind]nruntime.WorkerQueueSplitter } func (o *Options) complete() (*Options, error) { @@ -61,7 +63,14 @@ func (o *Options) complete() (*Options, error) { } } - defaultConfig := nruntime.Config{GroupConfig: nruntime.GroupConfig{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}, GVKThreadiness: result.GVKThreadiness} + defaultConfig := nruntime.Config{ + GroupConfig: nruntime.GroupConfig{ + Rest: result.DefaultRESTConfig, + Namespace: result.DefaultNamespace, + }, + GVKThreadiness: result.GVKThreadiness, + GVKQueueSplitters: result.GVKQueueSplitters, + } backend, err := nruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme) if err != nil { return nil, err @@ -93,7 +102,7 @@ func DefaultOptions(routerName string, scheme *runtime.Scheme) (*Options, error) } // DefaultRouter The routerName is important as this name will be used to assign ownership of objects created by this -// router. Specifically the routerName is assigned to the sub-context in the apply actions. Additionally, the routerName +// router. Specifically, the routerName is assigned to the sub-context in the apply actions. Additionally, the routerName // will be used for the leader election lease lock. func DefaultRouter(routerName string, scheme *runtime.Scheme) (*router.Router, error) { opts, err := DefaultOptions(routerName, scheme)