From 1f6217c0046fc56fc72c743f2672d688f08acac4 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Fri, 7 Feb 2025 14:05:04 -0500 Subject: [PATCH] enhance: start worker goroutines on demand Instead of having a bunch of waiting goroutines, this change will introduce one goroutine per type that waits for new tasks. That goroutine will spin up new goroutines, up to the desired limit, to handle tasks as they come in. Signed-off-by: Donnie Adams --- pkg/runtime/controller.go | 59 ++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..08bb9a2 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -148,20 +147,12 @@ func (c *controller) run(ctx context.Context, workers int) { c.startLock.Unlock() defer utilruntime.HandleCrash() - defer func() { - c.workqueue.ShutDown() - }() // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,26 +208,44 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + running := make(chan struct{}, workers) -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer func() { + defer wait.Wait() + }() - if shutdown { - return false - } + defer close(running) - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + go func() { + <-ctx.Done() + c.workqueue.ShutDown() + }() + + for { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return } - return true - } - return true + running <- struct{}{} + wait.Add(1) + + go func() { + defer func() { + <-running + wait.Done() + }() + + if err := c.processSingleItem(ctx, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + }() + } } func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {