diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..08bb9a2 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -148,20 +147,12 @@ func (c *controller) run(ctx context.Context, workers int) { c.startLock.Unlock() defer utilruntime.HandleCrash() - defer func() { - c.workqueue.ShutDown() - }() // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,26 +208,44 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + running := make(chan struct{}, workers) -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer func() { + defer wait.Wait() + }() - if shutdown { - return false - } + defer close(running) - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + go func() { + <-ctx.Done() + c.workqueue.ShutDown() + }() + + for { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return } - return true - } - return true + running <- struct{}{} + wait.Add(1) + + go func() { + defer func() { + <-running + wait.Done() + }() + + if err := c.processSingleItem(ctx, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + }() + } } func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {