Skip to content
This repository was archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request kubernetes#108414 from aojea/cacher_context
Browse files Browse the repository at this point in the history
cacher: don't accept requests if stopped
  • Loading branch information
k8s-ci-robot authored May 11, 2022
2 parents c0f48d2 + 2cb3a56 commit 999b1bb
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 52 deletions.
50 changes: 13 additions & 37 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}

c.ready.wait()
if err := c.ready.wait(); err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}

triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
Expand Down Expand Up @@ -559,7 +561,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o

// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}

objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
Expand Down Expand Up @@ -644,7 +648,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
trace.Step("Ready")

// List elements with at least 'listRV' from cache.
Expand Down Expand Up @@ -1011,6 +1017,7 @@ func (c *Cacher) Stop() {
return
}
c.stopped = true
c.ready.stop()
c.stopLock.Unlock()
close(c.stopCh)
c.stopWg.Wait()
Expand Down Expand Up @@ -1040,7 +1047,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit

// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
if err := c.ready.wait(); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}

resourceVersion := c.reflector.LastSyncResourceVersion()
return c.versioner.ParseResourceVersion(resourceVersion)
Expand Down Expand Up @@ -1426,36 +1435,3 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
}
}
}

type ready struct {
ok bool
c *sync.Cond
}

func newReady() *ready {
return &ready{c: sync.NewCond(&sync.RWMutex{})}
}

func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}

// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
func (r *ready) check() bool {
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.ok
}

func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}
122 changes: 107 additions & 15 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ func TestGetListCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
Expand Down Expand Up @@ -374,7 +376,9 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
result := &example.PodList{}

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
Expand Down Expand Up @@ -406,7 +410,9 @@ func TestGetCacheBypass(t *testing.T) {
result := &example.Pod{}

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
Expand Down Expand Up @@ -436,7 +442,9 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
Expand Down Expand Up @@ -561,7 +569,9 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
Expand Down Expand Up @@ -591,6 +601,68 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {

}

func TestCacheDontAcceptRequestsStopped(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}

// Wait until cacher is initialized.
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}

watchClosed := make(chan struct{})
go func() {
defer close(watchClosed)
for event := range w.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
// ok
default:
t.Errorf("unexpected event %#v", event)
}
}
}()

cacher.Stop()

_, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err == nil {
t.Fatalf("Success to create Watch: %v", err)
}

result := &example.Pod{}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "1",
}, result)
if err == nil {
t.Fatalf("Success to create Get: %v", err)
}

err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "1",
Recursive: true,
}, result)
if err == nil {
t.Fatalf("Success to create GetList: %v", err)
}

select {
case <-watchClosed:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for watch to close")
}

}

func TestTimeBucketWatchersBasic(t *testing.T) {
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
return true
Expand Down Expand Up @@ -642,7 +714,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = true

Expand Down Expand Up @@ -738,7 +812,9 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = allowWatchBookmarks

Expand Down Expand Up @@ -836,7 +912,9 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
cacher.bookmarkWatchers.bookmarkFrequency = time.Second

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = true

Expand Down Expand Up @@ -904,7 +982,9 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
Expand Down Expand Up @@ -980,7 +1060,9 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
Expand Down Expand Up @@ -1056,7 +1138,9 @@ func TestStartingResourceVersion(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
Expand Down Expand Up @@ -1134,7 +1218,9 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
Expand Down Expand Up @@ -1243,7 +1329,9 @@ func TestCachingDeleteEvents(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

fooPredicate := storage.SelectionPredicate{
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
Expand Down Expand Up @@ -1323,7 +1411,9 @@ func testCachingObjects(t *testing.T, watchersCount int) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

dispatchedEvents := []*watchCacheEvent{}
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
Expand Down Expand Up @@ -1417,7 +1507,9 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
defer cacher.Stop()

// Wait until cacher is initialized.
cacher.ready.wait()
if err := cacher.ready.wait(); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
// Ensure there is enough budget for slow processing since
// the entire watch cache is going to be served through the
// interval and events won't be popped from the cacheWatcher's
Expand Down
Loading

0 comments on commit 999b1bb

Please sign in to comment.