Skip to content

Commit

Permalink
chore: small performance updates
Browse files Browse the repository at this point in the history
Allow client operations to be cached, but not add triggers.

Use a map instead of a slice for storing trigger matchers for quicker
retrieval.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Jan 27, 2025
1 parent 2210c89 commit ed22001
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 60 deletions.
4 changes: 2 additions & 2 deletions pkg/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewHandlerSet(name string, scheme *runtime.Scheme, backend backend.Backend)
handlers: map[schema.GroupVersionKind][]Handler{},
},
triggers: triggers{
matchers: map[schema.GroupVersionKind]map[enqueueTarget][]objectMatcher{},
matchers: map[schema.GroupVersionKind]map[enqueueTarget]map[string]objectMatcher{},
trigger: backend,
gvkLookup: backend,
scheme: scheme,
Expand Down Expand Up @@ -317,7 +317,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO
}

if unmodifiedObject == nil {
// A nil object here means tha the object was deleted, so unregister the triggers
// A nil object here means that the object was deleted, so unregister the triggers
m.triggers.UnregisterAndTrigger(req)
} else {
m.triggers.Trigger(req)
Expand Down
11 changes: 11 additions & 0 deletions pkg/router/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ type objectMatcher struct {
Fields fields.Selector
}

func (o *objectMatcher) String() string {
s := o.Name + "/" + o.Namespace
if o.Selector != nil {
s += "/label selectors" + o.Selector.String()
}
if o.Fields != nil {
s += "/field selectors" + o.Fields.String()
}
return s
}

func (o *objectMatcher) Equals(other objectMatcher) bool {
if o.Name != other.Name {
return false
Expand Down
6 changes: 3 additions & 3 deletions pkg/router/tester/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/google/uuid"
"github.com/obot-platform/nah/pkg/router"
"github.com/obot-platform/nah/pkg/uncached"
"github.com/obot-platform/nah/pkg/untriggered"
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/api/errors"
meta2 "k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -38,7 +38,7 @@ func (c Client) objects() []kclient.Object {
}

func (c *Client) Get(ctx context.Context, key kclient.ObjectKey, out kclient.Object, opts ...kclient.GetOption) error {
if u, ok := out.(*uncached.Holder); ok {
if u, ok := out.(*untriggered.Holder); ok {
out = u.Object
}
t := reflect.TypeOf(out)
Expand Down Expand Up @@ -73,7 +73,7 @@ func copy(dest, src kclient.Object) {
}

func (c *Client) List(ctx context.Context, objList kclient.ObjectList, opts ...kclient.ListOption) error {
if u, ok := objList.(*uncached.HolderList); ok {
if u, ok := objList.(*untriggered.HolderList); ok {
objList = u.ObjectList
}

Expand Down
42 changes: 25 additions & 17 deletions pkg/router/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/obot-platform/nah/pkg/backend"
"github.com/obot-platform/nah/pkg/log"
"github.com/obot-platform/nah/pkg/uncached"
"github.com/obot-platform/nah/pkg/untriggered"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -16,7 +16,7 @@ import (

type triggers struct {
lock sync.RWMutex
matchers map[schema.GroupVersionKind]map[enqueueTarget][]objectMatcher
matchers map[schema.GroupVersionKind]map[enqueueTarget]map[string]objectMatcher
trigger backend.Trigger
gvkLookup backend.Backend
scheme *runtime.Scheme
Expand All @@ -36,15 +36,15 @@ func (m *triggers) invokeTriggers(req Request) {
m.lock.RLock()
defer m.lock.RUnlock()

for enqueueTarget, matchers := range m.matchers[req.GVK] {
if enqueueTarget.gvk == req.GVK &&
enqueueTarget.key == req.Key {
for et, matchers := range m.matchers[req.GVK] {
if et.gvk == req.GVK &&
et.key == req.Key {
continue
}
for _, matcher := range matchers {
if matcher.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v]", enqueueTarget.key, enqueueTarget.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(enqueueTarget.gvk, enqueueTarget.key, 0)
log.Debugf("Triggering [%s] [%v] from [%s] [%v]", et.key, et.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(et.gvk, et.key, 0)
break
}
}
Expand All @@ -61,15 +61,20 @@ func (m *triggers) register(gvk schema.GroupVersionKind, key string, targetGVK s
}
matchers, ok := m.matchers[targetGVK]
if !ok {
matchers = map[enqueueTarget][]objectMatcher{}
matchers = map[enqueueTarget]map[string]objectMatcher{}
m.matchers[targetGVK] = matchers
}
for _, existing := range matchers[target] {
if existing.Equals(mr) {
return
}

matcherKey := mr.String()
if _, ok := matchers[target][matcherKey]; ok {
return
}

if matchers[target] == nil {
matchers[target] = map[string]objectMatcher{}
}
matchers[target] = append(matchers[target], mr)

matchers[target][matcherKey] = mr
}

func (m *triggers) Trigger(req Request) {
Expand All @@ -79,7 +84,7 @@ func (m *triggers) Trigger(req Request) {
}

func (m *triggers) Register(sourceGVK schema.GroupVersionKind, key string, obj runtime.Object, namespace, name string, selector labels.Selector, fields fields.Selector) (schema.GroupVersionKind, bool, error) {
if uncached.IsWrapped(obj) {
if untriggered.IsWrapped(obj) {
return schema.GroupVersionKind{}, false, nil
}
gvk, err := m.gvkLookup.GVKForObject(obj, m.scheme)
Expand Down Expand Up @@ -107,7 +112,7 @@ func (m *triggers) UnregisterAndTrigger(req Request) {
m.lock.Lock()
defer m.lock.Unlock()

remainingMatchers := map[schema.GroupVersionKind]map[enqueueTarget][]objectMatcher{}
remainingMatchers := map[schema.GroupVersionKind]map[enqueueTarget]map[string]objectMatcher{}

for targetGVK, matchers := range m.matchers {
for target, mts := range matchers {
Expand All @@ -119,9 +124,12 @@ func (m *triggers) UnregisterAndTrigger(req Request) {
if targetGVK != req.GVK || mt.Namespace != req.Namespace || mt.Name != req.Name {
// If the matcher matches the deleted object exactly, then skip the matcher.
if remainingMatchers[targetGVK] == nil {
remainingMatchers[targetGVK] = make(map[enqueueTarget][]objectMatcher)
remainingMatchers[targetGVK] = make(map[enqueueTarget]map[string]objectMatcher)
}
if remainingMatchers[targetGVK][target] == nil {
remainingMatchers[targetGVK][target] = make(map[string]objectMatcher)
}
remainingMatchers[targetGVK][target] = append(remainingMatchers[targetGVK][target], mt)
remainingMatchers[targetGVK][target][mt.String()] = mt
}
if targetGVK == req.GVK && mt.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v] on delete", target.key, target.gvk, req.Key, req.GVK)
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/obot-platform/nah/pkg/backend"
"github.com/obot-platform/nah/pkg/fields"
"github.com/obot-platform/nah/pkg/router"
"github.com/obot-platform/nah/pkg/uncached"
"github.com/obot-platform/nah/pkg/untriggered"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kcache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -167,7 +167,7 @@ func (b *Backend) IsObjectNamespaced(obj runtime.Object) (bool, error) {
}

func (b *Backend) GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersionKind, error) {
return apiutil.GVKForObject(uncached.Unwrap(obj), scheme)
return apiutil.GVKForObject(untriggered.Unwrap(obj), scheme)
}

func (b *Backend) IndexField(ctx context.Context, obj kclient.Object, field string, extractValue kclient.IndexerFunc) error {
Expand Down
58 changes: 41 additions & 17 deletions pkg/runtime/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/obot-platform/nah/pkg/uncached"
"github.com/obot-platform/nah/pkg/untriggered"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -115,8 +115,11 @@ func (c *cacheClient) store(obj kclient.Object) {
}

func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclient.Object, opts ...kclient.GetOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Get(ctx, key, u.Object, opts...)
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.Get(ctx, key, obj, opts...)
}
}

getErr := c.cached.Get(ctx, key, obj)
Expand Down Expand Up @@ -155,15 +158,21 @@ func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclien
}

func (c *cacheClient) List(ctx context.Context, list kclient.ObjectList, opts ...kclient.ListOption) error {
if u, ok := list.(*uncached.HolderList); ok {
return c.uncached.List(ctx, u.ObjectList, opts...)
if u, ok := list.(*untriggered.HolderList); ok {
list = u.ObjectList
if u.IsUncached() {
return c.uncached.List(ctx, u, opts...)
}
}
return c.cached.List(ctx, list, opts...)
}

func (c *cacheClient) Create(ctx context.Context, obj kclient.Object, opts ...kclient.CreateOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Create(ctx, u.Object, opts...)
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.Create(ctx, obj, opts...)
}
}
err := c.cached.Create(ctx, obj, opts...)
if err != nil {
Expand All @@ -174,8 +183,11 @@ func (c *cacheClient) Create(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Delete(ctx context.Context, obj kclient.Object, opts ...kclient.DeleteOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Delete(ctx, u.Object, opts...)
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.Delete(ctx, obj, opts...)
}
}
err := c.cached.Delete(ctx, obj, opts...)
if err != nil {
Expand All @@ -186,8 +198,11 @@ func (c *cacheClient) Delete(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Update(ctx context.Context, obj kclient.Object, opts ...kclient.UpdateOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Update(ctx, u.Object, opts...)
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.Update(ctx, obj, opts...)
}
}
err := c.cached.Update(ctx, obj, opts...)
if err != nil {
Expand All @@ -198,8 +213,11 @@ func (c *cacheClient) Update(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Patch(ctx context.Context, obj kclient.Object, patch kclient.Patch, opts ...kclient.PatchOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Patch(ctx, u.Object, patch, opts...)
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.Patch(ctx, obj, patch, opts...)
}
}
err := c.cached.Patch(ctx, obj, patch, opts...)
if err != nil {
Expand All @@ -210,6 +228,12 @@ func (c *cacheClient) Patch(ctx context.Context, obj kclient.Object, patch kclie
}

func (c *cacheClient) DeleteAllOf(ctx context.Context, obj kclient.Object, opts ...kclient.DeleteAllOfOption) error {
if u, ok := obj.(*untriggered.Holder); ok {
obj = u.Object
if u.IsUncached() {
return c.uncached.DeleteAllOf(ctx, obj, opts...)
}
}
return c.cached.DeleteAllOf(ctx, obj, opts...)
}

Expand Down Expand Up @@ -248,15 +272,15 @@ type subResourceClient struct {
}

func (s *subResourceClient) Get(ctx context.Context, obj kclient.Object, subResource kclient.Object, opts ...kclient.SubResourceGetOption) error {
return s.reader.Get(ctx, uncached.Unwrap(obj).(kclient.Object), subResource, opts...)
return s.reader.Get(ctx, untriggered.Unwrap(obj).(kclient.Object), subResource, opts...)
}

func (s *subResourceClient) Create(ctx context.Context, obj kclient.Object, subResource kclient.Object, opts ...kclient.SubResourceCreateOption) error {
return s.writer.Create(ctx, uncached.Unwrap(obj).(kclient.Object), subResource, opts...)
return s.writer.Create(ctx, untriggered.Unwrap(obj).(kclient.Object), subResource, opts...)
}

func (s *subResourceClient) Update(ctx context.Context, obj kclient.Object, opts ...kclient.SubResourceUpdateOption) error {
err := s.writer.Update(ctx, uncached.Unwrap(obj).(kclient.Object), opts...)
err := s.writer.Update(ctx, untriggered.Unwrap(obj).(kclient.Object), opts...)
if err != nil {
return err
}
Expand All @@ -265,7 +289,7 @@ func (s *subResourceClient) Update(ctx context.Context, obj kclient.Object, opts
}

func (s *subResourceClient) Patch(ctx context.Context, obj kclient.Object, patch kclient.Patch, opts ...kclient.SubResourcePatchOption) error {
err := s.writer.Patch(ctx, uncached.Unwrap(obj).(kclient.Object), patch, opts...)
err := s.writer.Patch(ctx, untriggered.Unwrap(obj).(kclient.Object), patch, opts...)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/multi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/obot-platform/nah/pkg/uncached"
"github.com/obot-platform/nah/pkg/untriggered"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -155,7 +155,7 @@ func (m multiClient) Watch(ctx context.Context, obj kclient.ObjectList, opts ...
}

func (m multiClient) getClient(obj runtime.Object) (kclient.WithWatch, error) {
gvk, err := m.GroupVersionKindFor(uncached.Unwrap(obj))
gvk, err := m.GroupVersionKindFor(untriggered.Unwrap(obj))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ed22001

Please sign in to comment.