Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: start worker goroutines on demand #12

Merged
merged 2 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Callback func(gvk schema.GroupVersionKind, key string, obj runtime.Object) (runtime.Object, error)

type Trigger interface {
Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error
Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error
}

type Watcher interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *HandlerSet) checkDelay(gvk schema.GroupVersionKind, key string) bool {
m.limiterLock.Lock()
defer m.limiterLock.Unlock()
delete(m.waiting, lKey)
_ = m.backend.Trigger(gvk, ReplayPrefix+key, 0)
_ = m.backend.Trigger(m.ctx, gvk, ReplayPrefix+key, 0)
}()
return false
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO
req.Object = newObj

if resp.delay > 0 {
if err := m.backend.Trigger(gvk, key, resp.delay); err != nil {
if err := m.backend.Trigger(m.ctx, gvk, key, resp.delay); err != nil {
return nil, err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *triggers) invokeTriggers(req Request) {
for _, matcher := range matchers {
if matcher.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v]", et.key, et.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(et.gvk, et.key, 0)
_ = m.trigger.Trigger(req.Ctx, et.gvk, et.key, 0)
break
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (m *triggers) UnregisterAndTrigger(req Request) {
}
if targetGVK == req.GVK && mt.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v] on delete", target.key, target.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(target.gvk, target.key, 0)
_ = m.trigger.Trigger(req.Ctx, target.gvk, target.key, 0)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) {
return nil
}

func (b *Backend) Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error {
controller, err := b.cacheFactory.ForKind(gvk)
func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error {
controller, err := b.cacheFactory.ForKind(ctx, gvk)
if err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e
}

func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error {
c, err := b.cacheFactory.ForKind(gvk)
c, err := b.cacheFactory.ForKind(ctx, gvk)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/runtime/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type Runtime struct {

type Config struct {
GroupConfig
GVKThreadiness map[schema.GroupVersionKind]int
GVKThreadiness map[schema.GroupVersionKind]int
GVKQueueSplitters map[schema.GroupVersionKind]WorkerQueueSplitter
}

type GroupConfig struct {
Expand Down Expand Up @@ -69,7 +70,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Grou
aggCache := multi.NewCache(scheme, theCache, caches)

factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{
KindWorkers: defaultConfig.GVKThreadiness,
KindWorkers: defaultConfig.GVKThreadiness,
KindQueueSplitter: defaultConfig.GVKQueueSplitters,
// In nah this is only invoked when a key fails to process
DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter(
// This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes
Expand Down
143 changes: 96 additions & 47 deletions pkg/runtime/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientgocache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -49,7 +48,7 @@ type controller struct {
startLock sync.Mutex

name string
workqueue workqueue.TypedRateLimitingInterface[any]
workqueues []workqueue.TypedRateLimitingInterface[any]
rateLimiter workqueue.TypedRateLimiter[any]
informer cache.Informer
handler Handler
Expand All @@ -59,6 +58,7 @@ type controller struct {
registration clientgocache.ResourceEventHandlerRegistration
obj runtime.Object
cache cache.Cache
splitter WorkerQueueSplitter
}

type startKey struct {
Expand All @@ -67,18 +67,34 @@ type startKey struct {
}

type Options struct {
RateLimiter workqueue.TypedRateLimiter[any]
RateLimiter workqueue.TypedRateLimiter[any]
QueueSplitter WorkerQueueSplitter
}

func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) {
type WorkerQueueSplitter interface {
Queues() int
Split(key string) int
}

type singleWorkerQueueSplitter struct{}

func (*singleWorkerQueueSplitter) Queues() int {
return 1
}

func (*singleWorkerQueueSplitter) Split(string) int {
return 0
}

func New(ctx context.Context, gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) {
opts = applyDefaultOptions(opts)

obj, err := newObject(scheme, gvk)
if err != nil {
return nil, err
}

informer, err := cache.GetInformerForKind(context.TODO(), gvk)
informer, err := cache.GetInformerForKind(ctx, gvk)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +107,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache,
obj: obj,
rateLimiter: opts.RateLimiter,
informer: informer,
splitter: opts.QueueSplitter,
}

return controller, nil
Expand All @@ -115,6 +132,9 @@ func applyDefaultOptions(opts *Options) *Options {
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 30*time.Second),
)
}
if newOpts.QueueSplitter == nil {
newOpts.QueueSplitter = (*singleWorkerQueueSplitter)(nil)
}
return &newOpts
}

Expand All @@ -136,32 +156,27 @@ func (c *controller) run(ctx context.Context, workers int) {
// will create a goroutine under the hood. It we instantiate a workqueue we must have
// a mechanism to Shutdown it down. Without the stopCh we don't know when to shutdown
// the queue and release the goroutine
c.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: c.name})
c.workqueues = make([]workqueue.TypedRateLimitingInterface[any], c.splitter.Queues())
for i := range c.workqueues {
c.workqueues[i] = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: fmt.Sprintf("%s-%d", c.name, i)})
}
for _, start := range c.startKeys {
if start.after == 0 {
c.workqueue.Add(start.key)
c.workqueues[c.splitter.Split(start.key)].Add(start.key)
} else {
c.workqueue.AddAfter(start.key, start.after)
c.workqueues[c.splitter.Split(start.key)].AddAfter(start.key, start.after)
}
}
c.startKeys = nil
c.startLock.Unlock()

defer utilruntime.HandleCrash()
defer func() {
c.workqueue.ShutDown()
}()

// Start the informer factories to begin populating the informer caches
log.Infof("Starting %s controller", c.name)

for i := 0; i < workers; i++ {
go wait.Until(func() {
c.runWorker(ctx)
}, time.Second, ctx.Done())
}
c.runWorkers(ctx, workers)

<-ctx.Done()
c.startLock.Lock()
defer c.startLock.Unlock()
c.started = false
Expand Down Expand Up @@ -217,47 +232,77 @@ func (c *controller) Start(ctx context.Context, workers int) error {
return nil
}

func (c *controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
func (c *controller) runWorkers(ctx context.Context, workers int) {
wait := sync.WaitGroup{}
workers = workers / len(c.workqueues)
if workers == 0 {
workers = 1
}
}

func (c *controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
defer func() {
defer wait.Wait()
}()

if shutdown {
return false
for _, queue := range c.workqueues {
go func() {
// This channel acts as a semaphore to limit the number of concurrent
// work items handled by this controller.
running := make(chan struct{}, workers)
defer close(running)

for {
obj, shutdown := queue.Get()

if shutdown {
return
}

// Acquire from the semaphore
running <- struct{}{}
wait.Add(1)

go func() {
defer func() {
// Release to the semaphore
<-running
wait.Done()
}()

if err := c.processSingleItem(ctx, queue, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
}
}()
}
}()
}

if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
return true
<-ctx.Done()
for i := range c.workqueues {
c.workqueues[i].ShutDown()
}

return true
}

func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {
func (c *controller) processSingleItem(ctx context.Context, queue workqueue.TypedRateLimitingInterface[any], obj interface{}) error {
var (
key string
ok bool
)

defer c.workqueue.Done(obj)
defer queue.Done(obj)

if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
queue.Forget(obj)
log.Errorf("expected string in workqueue but got %#v", obj)
return nil
}
if err := c.syncHandler(ctx, key); err != nil {
c.workqueue.AddRateLimited(key)
queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}

c.workqueue.Forget(obj)
queue.Forget(obj)
return nil
}

Expand All @@ -271,7 +316,7 @@ func (c *controller) syncHandler(ctx context.Context, key string) error {
return c.handler.OnChange(key, nil)
}

ns, name := keyParse(key)
ns, name := KeyParse(key)
obj := c.obj.DeepCopyObject().(kclient.Object)
err := c.cache.Get(ctx, kclient.ObjectKey{
Name: name,
Expand All @@ -290,10 +335,10 @@ func (c *controller) EnqueueKey(key string) {
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
c.workqueues[c.splitter.Split(key)].Add(key)
}
}

Expand All @@ -303,10 +348,10 @@ func (c *controller) Enqueue(namespace, name string) {
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.AddRateLimited(key)
c.workqueues[c.splitter.Split(key)].AddRateLimited(key)
}
}

Expand All @@ -316,15 +361,19 @@ func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key, after: duration})
} else {
c.workqueue.AddAfter(key, duration)
c.workqueues[c.splitter.Split(key)].AddAfter(key, duration)
}
}

func keyParse(key string) (namespace string, name string) {
var ok bool
func KeyParse(key string) (namespace string, name string) {
special, key, ok := strings.Cut(key, " ")
if !ok {
key = special
}

namespace, name, ok = strings.Cut(key, "/")
if !ok {
name = namespace
Expand All @@ -348,10 +397,10 @@ func (c *controller) enqueue(obj interface{}) {
return
}
c.startLock.Lock()
if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
c.workqueues[c.splitter.Split(key)].Add(key)
}
c.startLock.Unlock()
}
Expand Down
Loading