Skip to content

Commit

Permalink
enhance: start worker goroutines on demand
Browse files Browse the repository at this point in the history
Instead of having a bunch of waiting goroutines, this change will
introduce one goroutine per type that waits for new tasks. That
goroutine will spin up new goroutines, up to the desired limit, to
handle tasks as they come in.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Feb 7, 2025
1 parent 3b7c581 commit 1f6217c
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions pkg/runtime/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientgocache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -148,20 +147,12 @@ func (c *controller) run(ctx context.Context, workers int) {
c.startLock.Unlock()

defer utilruntime.HandleCrash()
defer func() {
c.workqueue.ShutDown()
}()

// Start the informer factories to begin populating the informer caches
log.Infof("Starting %s controller", c.name)

for i := 0; i < workers; i++ {
go wait.Until(func() {
c.runWorker(ctx)
}, time.Second, ctx.Done())
}
c.runWorkers(ctx, workers)

<-ctx.Done()
c.startLock.Lock()
defer c.startLock.Unlock()
c.started = false
Expand Down Expand Up @@ -217,26 +208,44 @@ func (c *controller) Start(ctx context.Context, workers int) error {
return nil
}

func (c *controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *controller) runWorkers(ctx context.Context, workers int) {
wait := sync.WaitGroup{}
running := make(chan struct{}, workers)

func (c *controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
defer func() {
defer wait.Wait()
}()

if shutdown {
return false
}
defer close(running)

if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
go func() {
<-ctx.Done()
c.workqueue.ShutDown()
}()

for {
obj, shutdown := c.workqueue.Get()

if shutdown {
return
}
return true
}

return true
running <- struct{}{}
wait.Add(1)

go func() {
defer func() {
<-running
wait.Done()
}()

if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
}
}()
}
}

func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {
Expand Down

0 comments on commit 1f6217c

Please sign in to comment.