From 8d3db09acbc4c7c4383a365df54fdbacdc15a4cd Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Mon, 5 Feb 2024 12:42:09 -0500 Subject: [PATCH] [Sotw][Linear cache] Ensure watches are properly considering subscription changes to not miss new resources (#10) * Fix linear tests to properly track subscription and adapt to the new version handling. Ensure we always reply with all resources when version is not set or the version prefix does not match our cache * Add tests for sotw linear watches * Fix comments and linting Signed-off-by: Valerian Roche --------- Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 25 ++ pkg/cache/v3/delta_test.go | 11 +- pkg/cache/v3/linear.go | 315 ++++++++++----- pkg/cache/v3/linear_test.go | 567 +++++++++++++++++++++++---- pkg/cache/v3/simple.go | 109 ++--- pkg/cache/v3/simple_test.go | 127 +++--- pkg/cache/v3/status.go | 3 + pkg/server/sotw/v3/server.go | 12 +- pkg/server/stream/v3/subscription.go | 15 + 9 files changed, 867 insertions(+), 317 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 16d464d444..a130e3e5b6 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -119,6 +119,11 @@ type Response interface { // Get the version in the Response. GetVersion() (string, error) + // GetReturnedResources returns the map of resources and their versions returned in the subscription. + // It may include more resources than directly set in the response to consider the full state of the client. + // The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription. + GetReturnedResources() map[string]string + // Get the context provided during response creation. GetContext() context.Context } @@ -156,6 +161,12 @@ type RawResponse struct { // Resources to be included in the response. Resources []types.ResourceWithTTL + // ReturnedResources tracks the resources returned for the subscription and the version when it was last returned, + // including previously returned ones when using non-full state resources. + // It allows the cache to know what the client knows. The server will transparently forward this + // across requests, and the cache is responsible for its interpretation. + ReturnedResources map[string]string + // Whether this is a heartbeat response. For xDS versions that support TTL, this // will be converted into a response that doesn't contain the actual resource protobuf. // This allows for more lightweight updates that server only to update the TTL timer. @@ -208,6 +219,12 @@ type PassthroughResponse struct { DiscoveryResponse *discovery.DiscoveryResponse ctx context.Context + + // ReturnedResources tracks the resources returned for the subscription and the version when it was last returned, + // including previously returned ones when using non-full state resources. + // It allows the cache to know what the client knows. The server will transparently forward this + // across requests, and the cache is responsible for its interpretation. + ReturnedResources map[string]string } // DeltaPassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations. @@ -265,6 +282,10 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro return marshaledResponse.(*discovery.DiscoveryResponse), nil } +func (r *RawResponse) GetReturnedResources() map[string]string { + return r.ReturnedResources +} + // GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently. // We can do this because the marshaled response does not change across the calls. // This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load. @@ -368,6 +389,10 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon return r.DiscoveryResponse, nil } +func (r *PassthroughResponse) GetReturnedResources() map[string]string { + return r.ReturnedResources +} + // GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response. func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) { return r.DeltaDiscoveryResponse, nil diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 9877330c88..c98c06cfb4 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -16,6 +16,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -30,7 +31,7 @@ func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) { } func TestSnapshotCacheDeltaWatch(t *testing.T) { - c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t)) watches := make(map[string]chan cache.DeltaResponse) subscriptions := make(map[string]stream.Subscription) @@ -118,7 +119,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { } func TestDeltaRemoveResources(t *testing.T) { - c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t)) watches := make(map[string]chan cache.DeltaResponse) subscriptions := make(map[string]*stream.Subscription) @@ -198,7 +199,7 @@ func TestDeltaRemoveResources(t *testing.T) { } func TestConcurrentSetDeltaWatch(t *testing.T) { - c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t)) for i := 0; i < 50; i++ { version := fmt.Sprintf("v%d", i) func(i int) { @@ -235,7 +236,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { type testKey struct{} func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) @@ -280,7 +281,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { } func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 6d9d3793ce..fa65e9192f 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -27,8 +27,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/log" ) -type watches = map[ResponseWatch]struct{} - // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions // internally. It implements the cache interface for a single type URL and @@ -41,9 +39,13 @@ type LinearCache struct { resources map[string]types.Resource // Watches open by clients, indexed by resource name. Whenever resources // are changed, the watch is triggered. - watches map[string]watches - // Set of watches for all resources in the collection - watchAll watches + watches map[string]map[int64]ResponseWatch + // Set of watches for all resources in the collection, indexed by watch id. + // watch id is unique for sotw watches and is used to index them without requiring + // the watch itself to be hashable, as well as making logs easier to correlate. + watchAll map[int64]ResponseWatch + // Continuously incremented counter used to index sotw watches. + sotwWatchCount int64 // Set of delta watches. A delta watch always contain the list of subscribed resources // together with its current version // version and versionPrefix fields are ignored for delta watches, because we always generate the resource version. @@ -58,7 +60,7 @@ type LinearCache struct { // Version prefix to be sent to the clients versionPrefix string // Versions for each resource by name. - versionVector map[string]uint64 + versionVector map[string]string log log.Logger @@ -83,9 +85,6 @@ func WithVersionPrefix(prefix string) LinearCacheOption { func WithInitialResources(resources map[string]types.Resource) LinearCacheOption { return func(cache *LinearCache) { cache.resources = resources - for name := range resources { - cache.versionVector[name] = 0 - } } } @@ -100,74 +99,184 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { out := &LinearCache{ typeURL: typeURL, resources: make(map[string]types.Resource), - watches: make(map[string]watches), - watchAll: make(watches), + watches: make(map[string]map[int64]ResponseWatch), + watchAll: make(map[int64]ResponseWatch), deltaWatches: make(map[int64]DeltaResponseWatch), versionMap: nil, version: 0, - versionVector: make(map[string]uint64), + versionVector: make(map[string]string), log: log.NewDefaultLogger(), } for _, opt := range opts { opt(out) } + for name := range out.resources { + out.versionVector[name] = out.getVersion() + } return out } -func (cache *LinearCache) respond(watch ResponseWatch, staleResources []string) { +func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, ignoreReturnedResources bool) *RawResponse { + var changedResources []string + var removedResources []string + + knownVersions := watch.subscription.ReturnedResources() + if ignoreReturnedResources { + // The response will include all resources, with no regards of resources potentially already returned. + knownVersions = make(map[string]string) + } + + if watch.subscription.IsWildcard() { + for resourceName, version := range cache.versionVector { + knownVersion, ok := knownVersions[resourceName] + if !ok { + // This resource is not yet known by the client (new resource added in the cache or newly subscribed). + changedResources = append(changedResources, resourceName) + } else if knownVersion != version { + // The client knows an outdated version. + changedResources = append(changedResources, resourceName) + } + } + + // Negative check to identify resources that have been removed in the cache. + // Sotw does not support returning "deletions", but in the case of full state resources + // a response must then be returned. + for resourceName := range knownVersions { + if _, ok := cache.versionVector[resourceName]; !ok { + removedResources = append(removedResources, resourceName) + } + } + } else { + for resourceName := range watch.subscription.SubscribedResources() { + version, exists := cache.versionVector[resourceName] + knownVersion, known := knownVersions[resourceName] + if !exists { + if known { + // This resource was removed from the cache. If the type requires full state + // we need to return a response. + removedResources = append(removedResources, resourceName) + } + continue + } + + if !known { + // This resource is not yet known by the client (new resource added in the cache or newly subscribed). + changedResources = append(changedResources, resourceName) + } else if knownVersion != version { + // The client knows an outdated version. + changedResources = append(changedResources, resourceName) + } + } + + for resourceName := range knownVersions { + // If the subscription no longer watches a resource, + // we mark it as unknown on the client side to ensure it will be resent to the client if subscribing again later on. + if _, ok := watch.subscription.SubscribedResources()[resourceName]; !ok { + removedResources = append(removedResources, resourceName) + } + } + } + + if len(changedResources) == 0 && len(removedResources) == 0 && !ignoreReturnedResources { + // Nothing changed. + return nil + } + + returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) + // Clone the current returned versions. The cache should not alter the subscription + for resourceName, version := range watch.subscription.ReturnedResources() { + returnedVersions[resourceName] = version + } + + cacheVersion := cache.getVersion() var resources []types.ResourceWithTTL - // TODO: optimize the resources slice creations across different clients - if len(staleResources) == 0 { - // Wildcard case, we return all resources in the cache + + switch { + // Depending on the type, the response will only include changed resources or all of them + case !ResourceRequiresFullStateInSotw(cache.typeURL): + // changedResources is already filtered based on the subscription. + // TODO(valerian-roche): if the only change is a removal in the subscription, + // or a watched resource getting deleted, this might send an empty reply. + // While this does not violate the protocol, we might want to avoid it. + resources = make([]types.ResourceWithTTL, 0, len(changedResources)) + for _, resourceName := range changedResources { + resources = append(resources, types.ResourceWithTTL{Resource: cache.resources[resourceName]}) + returnedVersions[resourceName] = cache.versionVector[resourceName] + } + case watch.subscription.IsWildcard(): + // Include all resources for the type. resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resource := range cache.resources { + for resourceName, resource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + returnedVersions[resourceName] = cache.versionVector[resourceName] } - } else if ResourceRequiresFullStateInSotw(cache.typeURL) { - // Non-wildcard request for a type requiring full state response - // We need to return all requested resources, if existing, for this type - requestedResources := watch.Request.GetResourceNames() + default: + // Include all resources matching the subscription, with no concern on whether + // it has been updated or not. + requestedResources := watch.subscription.SubscribedResources() + // The linear cache could be very large (e.g. containing all potential CLAs) + // Therefore drives on the subscription requested resources. resources = make([]types.ResourceWithTTL, 0, len(requestedResources)) - for _, resource := range requestedResources { - resource := cache.resources[resource] - if resource != nil { - resources = append(resources, types.ResourceWithTTL{Resource: resource}) - } - } - } else { - // Non-wildcard request for other types. Only return stale resources - resources = make([]types.ResourceWithTTL, 0, len(staleResources)) - for _, name := range staleResources { - resource := cache.resources[name] - if resource != nil { - resources = append(resources, types.ResourceWithTTL{Resource: resource}) + for resourceName := range requestedResources { + resource, ok := cache.resources[resourceName] + if !ok { + continue } + resources = append(resources, types.ResourceWithTTL{Resource: resource}) + returnedVersions[resourceName] = cache.versionVector[resourceName] } } - watch.Response <- &RawResponse{ - Request: watch.Request, - Resources: resources, - Version: cache.getVersion(), - Ctx: context.Background(), + + // Cleanup resources no longer existing in the cache or no longer subscribed. + // In sotw we cannot return those if not full state, + // but this ensures we detect unsubscription then resubscription. + for _, resourceName := range removedResources { + delete(returnedVersions, resourceName) + } + + if !ignoreReturnedResources && !ResourceRequiresFullStateInSotw(cache.typeURL) && len(resources) == 0 { + // If the request is not the initial one, and the type for not require full updates, + // do not return if noting is to be set + // For full-state resources an empty response does have a semantic meaning + return nil + } + + return &RawResponse{ + Request: watch.Request, + Resources: resources, + ReturnedResources: returnedVersions, + Version: cacheVersion, + Ctx: context.Background(), } } func (cache *LinearCache) notifyAll(modified map[string]struct{}) { - // de-duplicate watches that need to be responded - notifyList := make(map[ResponseWatch][]string) + // Gather the list of non-wildcard watches impacted by the modified resources. + watches := make(map[int64]ResponseWatch) for name := range modified { - for watch := range cache.watches[name] { - notifyList[watch] = append(notifyList[watch], name) + for watchID, watch := range cache.watches[name] { + watches[watchID] = watch } } - for watch, stale := range notifyList { - cache.removeWatch(watch) - cache.respond(watch, stale) + for watchID, watch := range watches { + response := cache.computeSotwResponse(watch, false) + if response != nil { + watch.Response <- response + cache.removeWatch(watchID, watch.subscription) + } else { + cache.log.Warnf("[Linear cache] Watch %d detected as triggered did not get notified", watchID) + } } - for watch := range cache.watchAll { - cache.respond(watch, nil) + + for watchID, watch := range cache.watchAll { + response := cache.computeSotwResponse(watch, false) + if response != nil { + watch.Response <- response + delete(cache.watchAll, watchID) + } else { + cache.log.Warnf("[Linear cache] Watch %d detected as triggered did not get notified", watchID) + } } - cache.watchAll = make(watches) // Building the version map has a very high cost when using SetResources to do full updates. // As it is only used with delta watches, it is only maintained when applicable. @@ -218,7 +327,7 @@ func (cache *LinearCache) UpdateResource(name string, res types.Resource) error defer cache.mu.Unlock() cache.version++ - cache.versionVector[name] = cache.version + cache.versionVector[name] = cache.getVersion() cache.resources[name] = res // TODO: batch watch closures to prevent rapid updates @@ -249,10 +358,11 @@ func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, to defer cache.mu.Unlock() cache.version++ + cacheVersion := cache.getVersion() modified := make(map[string]struct{}, len(toUpdate)+len(toDelete)) for name, resource := range toUpdate { - cache.versionVector[name] = cache.version + cache.versionVector[name] = cacheVersion cache.resources[name] = resource modified[name] = struct{}{} } @@ -275,6 +385,7 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) { defer cache.mu.Unlock() cache.version++ + cacheVersion := cache.getVersion() modified := map[string]struct{}{} // Collect deleted resource names. @@ -291,7 +402,7 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) { // We assume all resources passed to SetResources are changed. // Otherwise we would have to do proto.Equal on resources which is pretty expensive operation for name := range resources { - cache.versionVector[name] = cache.version + cache.versionVector[name] = cacheVersion modified[name] = struct{}{} } @@ -312,90 +423,86 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) { +func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL) } - // If the version is not up to date, check whether any requested resource has - // been updated between the last version and the current version. This avoids the problem - // of sending empty updates whenever an irrelevant resource changes. - stale := false - staleResources := []string{} // empty means all - - // strip version prefix if it is present - var lastVersion uint64 - var err error - if strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { - lastVersion, err = strconv.ParseUint(request.GetVersionInfo()[len(cache.versionPrefix):], 0, 64) - } else { - err = errors.New("mis-matched version prefix") + + // If the request does not include a version the client considers it has no current state. + // In this case we will always reply to allow proper initialization of dependencies in the client. + ignoreCurrentSubscriptionResources := request.GetVersionInfo() == "" + if !strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { + // If the version of the request does not match the cache prefix, we will send a response in all cases to match the legacy behavior. + ignoreCurrentSubscriptionResources = true + cache.log.Debugf("[linear cache] received watch with version %s not matching the cache prefix %s. Will return all known resources", request.GetVersionInfo(), cache.versionPrefix) } - watch := ResponseWatch{Request: request, Response: value} + // A major difference between delta and sotw is the ability to not resend everything when connecting to a new control-plane + // In delta the request provides the version of the resources it does know, even if the request is wildcard or does request more resources + // In sotw the request only provides the global version of the control-plane, and there is no way for the control-plane to know if resources have + // been added since in the requested resources. In the context of generalized wildcard, even wildcard could be new, and taking the assumption + // that wildcard implies that the client already knows all resources at the given version is no longer true. + // We could optimize the reconnection case here if: + // - we take the assumption that clients will not start requesting wildcard while providing a version. We could then ignore requests providing the resources. + // - we use the version as some form of hash of resources known, and we can then consider it as a way to correctly verify whether all resources are unchanged. + // For now it is not done as: + // - for the first case, while the protocol documentation does not explicitly mention the case, it does not mark it impossible and explicitly references unsubscribing from wildcard. + // - for the second one we could likely do it with little difficulty if need be, but if users rely on the current monotonic version it could impact their callbacks implementations. + watch := ResponseWatch{Request: request, Response: value, subscription: sub} cache.mu.Lock() defer cache.mu.Unlock() - switch { - case err != nil: - stale = true - staleResources = request.GetResourceNames() - cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error()) - case len(request.GetResourceNames()) == 0: - stale = (lastVersion != cache.version) - if stale { - cache.log.Debugf("Watch is stale as cache version %d differs for wildcard watch %d", cache.version, lastVersion) - } - default: - for _, name := range request.GetResourceNames() { - // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < cache.versionVector[name] { - stale = true - staleResources = append(staleResources, name) - } - } - if stale { - cache.log.Debugf("Watch is stale with stale resources %v", staleResources) - } - } - if stale { - cache.respond(watch, staleResources) - return nil, nil + response := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + if response != nil { + cache.log.Debugf("replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) + watch.Response <- response + return func() {}, nil } + + watchID := cache.nextSotwWatchID() // Create open watches since versions are up to date. - if len(request.GetResourceNames()) == 0 { - cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", cache.typeURL, cache.getVersion()) - cache.watchAll[watch] = struct{}{} + if sub.IsWildcard() { + cache.log.Infof("[linear cache] open watch %d for %s all resources, system version %q", watchID, cache.typeURL, cache.getVersion()) + cache.watchAll[watchID] = watch return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.watchAll, watch) + delete(cache.watchAll, watchID) }, nil } - cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion()) - for _, name := range request.GetResourceNames() { + cache.log.Infof("[linear cache] open watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion()) + for name := range sub.SubscribedResources() { set, exists := cache.watches[name] if !exists { - set = make(watches) + set = make(map[int64]ResponseWatch) cache.watches[name] = set } - set[watch] = struct{}{} + set[watchID] = watch } return func() { cache.mu.Lock() defer cache.mu.Unlock() - cache.removeWatch(watch) + cache.removeWatch(watchID, watch.subscription) }, nil } +func (cache *LinearCache) nextSotwWatchID() int64 { + next := atomic.AddInt64(&cache.sotwWatchCount, 1) + if next < 0 { + panic("watch id count overflow") + } + return next +} + // Must be called under lock -func (cache *LinearCache) removeWatch(watch ResponseWatch) { +func (cache *LinearCache) removeWatch(watchID int64, sub Subscription) { // Make sure we clean the watch for ALL resources it might be associated with, // as the channel will no longer be listened to - for _, resource := range watch.Request.ResourceNames { + for resource := range sub.SubscribedResources() { resourceWatches := cache.watches[resource] - delete(resourceWatches, watch) + delete(resourceWatches, watchID) if len(resourceWatches) == 0 { delete(cache.watches, resource) } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index f1c8a25613..fdc4fa11fc 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -38,6 +38,28 @@ const ( testType = "google.protobuf.StringValue" ) +type validationContext struct { + expectedType string +} + +func newValidationContext(opts []validateOption) validationContext { + context := validationContext{ + expectedType: testType, + } + for _, opt := range opts { + opt(&context) + } + return context +} + +type validateOption = func(*validationContext) + +func responseType(t string) validateOption { + return func(vo *validationContext) { + vo.expectedType = t + } +} + func testResource(s string) types.Resource { return wrapperspb.String(s) } @@ -48,7 +70,7 @@ func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType, expec select { case r = <-ch: case <-time.After(1 * time.Second): - t.Error("failed to receive response after 1 second") + t.Fatal("failed to receive response after 1 second") return nil, nil } @@ -71,28 +93,26 @@ func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType, expec if out.GetTypeUrl() != expectedType { t.Errorf("unexpected type URL: %q", out.GetTypeUrl()) } - if len(r.GetRequest().GetResourceNames()) != 0 && len(r.GetRequest().GetResourceNames()) < len(out.Resources) { - t.Errorf("received more resources (%d) than requested (%d)", len(r.GetRequest().GetResourceNames()), len(out.Resources)) - } return r, out } -func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, expectedResourcesNb int) { +func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, expectedResourcesNb int) Response { t.Helper() - _, r := verifyResponseContent(t, ch, testType, expectedVersion) + response, r := verifyResponseContent(t, ch, testType, expectedVersion) if r == nil { - return + return nil } if n := len(r.GetResources()); n != expectedResourcesNb { t.Errorf("unexpected number of responses: got %d, want %d", n, expectedResourcesNb) } + return response } -func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType, expectedVersion string, expectedResources ...string) { +func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType, expectedVersion string, expectedResources ...string) Response { t.Helper() r, _ := verifyResponseContent(t, ch, expectedType, expectedVersion) if r == nil { - return + return nil } out := r.(*RawResponse) resourceNames := []string{} @@ -100,6 +120,7 @@ func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType, exp resourceNames = append(resourceNames, GetResourceName(res.Resource)) } assert.ElementsMatch(t, resourceNames, expectedResources) + return r } type resourceInfo struct { @@ -107,11 +128,13 @@ type resourceInfo struct { version string } -func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourceInfo, deleted []string) { +func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourceInfo, deleted []string, options ...validateOption) { t.Helper() - if resp.GetDeltaRequest().GetTypeUrl() != testType { - t.Errorf("unexpected empty request type URL: %q", resp.GetDeltaRequest().GetTypeUrl()) + validationCtx := newValidationContext(options) + + if resp.GetDeltaRequest().GetTypeUrl() != validationCtx.expectedType { + t.Errorf("unexpected request type URL: received %s and expected %s", resp.GetDeltaRequest().GetTypeUrl(), validationCtx.expectedType) } out, err := resp.GetDeltaDiscoveryResponse() if err != nil { @@ -136,8 +159,8 @@ func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourc t.Errorf("resource with name %q not found in response", r.name) } } - if out.GetTypeUrl() != testType { - t.Errorf("unexpected type URL: %q", out.GetTypeUrl()) + if out.GetTypeUrl() != validationCtx.expectedType { + t.Errorf("unexpected type URL: received %s and expected %s", out.GetTypeUrl(), validationCtx.expectedType) } if len(out.GetRemovedResources()) != len(deleted) { t.Errorf("unexpected number of removed resurces: got %d, want %d", len(out.GetRemovedResources()), len(deleted)) @@ -156,7 +179,7 @@ func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourc } } -func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) { +func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string, options ...validateOption) { t.Helper() var r DeltaResponse select { @@ -165,7 +188,7 @@ func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []reso t.Error("timeout waiting for delta response") return } - validateDeltaResponse(t, r, resources, deleted) + validateDeltaResponse(t, r, resources, deleted, options...) } func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { @@ -244,6 +267,15 @@ func subFromRequest(req *Request) stream.Subscription { return stream.NewSotwSubscription(req.GetResourceNames()) } +// This method represents the expected behavior of client and servers regarding the request and the subscription. +// For edge cases it should ignore those +func updateFromSotwResponse(resp Response, sub *stream.Subscription, req *Request) { + sub.SetReturnedResources(resp.GetReturnedResources()) + // Never returns an error when not using passthrough responses + version, _ := resp.GetVersion() + req.VersionInfo = version +} + func subFromDeltaRequest(req *DeltaRequest) stream.Subscription { return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions()) } @@ -302,18 +334,30 @@ func TestLinearBasic(t *testing.T) { require.NoError(t, c.UpdateResource("a", testResource("a"))) checkWatchCount(t, c, "a", 0) checkWatchCount(t, c, "b", 0) - verifyResponse(t, w1, "1", 1) - verifyResponse(t, w2, "1", 1) + + resp1 := verifyResponse(t, w1, "1", 1) + updateFromSotwResponse(resp1, &sub1, req1) + resp2 := verifyResponse(t, w2, "1", 1) + updateFromSotwResponse(resp2, &sub2, req2) // Request again, should get same response + w1 = make(chan Response, 1) + req1 = &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub1 = subFromRequest(req1) _, err = c.CreateWatch(req1, sub1, w1) require.NoError(t, err) checkWatchCount(t, c, "a", 0) - verifyResponse(t, w1, "1", 1) + resp1 = verifyResponse(t, w1, "1", 1) + updateFromSotwResponse(resp1, &sub1, req1) + + w2 = make(chan Response, 1) + req2 = &Request{TypeUrl: testType, VersionInfo: "0"} + sub2 = subFromRequest(req2) _, err = c.CreateWatch(req2, sub2, w2) require.NoError(t, err) checkWatchCount(t, c, "a", 0) - verifyResponse(t, w2, "1", 1) + resp2 = verifyResponse(t, w2, "1", 1) + updateFromSotwResponse(resp2, &sub2, req2) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) @@ -350,15 +394,15 @@ func TestLinearSetResources(t *testing.T) { "a": testResource("a"), "b": testResource("b"), }) - verifyResponse(t, w1, "1", 1) - verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources + resp1 := verifyResponse(t, w1, "1", 1) + updateFromSotwResponse(resp1, &sub1, req1) + resp2 := verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources + updateFromSotwResponse(resp2, &sub2, req2) // Add another element and update the first, response should be different - req1.VersionInfo = "1" _, err = c.CreateWatch(req1, sub1, w1) require.NoError(t, err) mustBlock(t, w1) - req2.VersionInfo = "1" _, err = c.CreateWatch(req2, sub2, w2) require.NoError(t, err) mustBlock(t, w2) @@ -368,15 +412,15 @@ func TestLinearSetResources(t *testing.T) { "b": testResource("b"), "c": testResource("c"), }) - verifyResponse(t, w1, "2", 1) - verifyResponse(t, w2, "2", 3) + resp1 = verifyResponse(t, w1, "2", 1) + updateFromSotwResponse(resp1, &sub1, req1) + resp2 = verifyResponse(t, w2, "2", 3) + updateFromSotwResponse(resp2, &sub2, req2) // Delete resource - req1.VersionInfo = "2" _, err = c.CreateWatch(req1, sub1, w1) require.NoError(t, err) mustBlock(t, w1) - req2.VersionInfo = "2" _, err = c.CreateWatch(req2, sub2, w2) require.NoError(t, err) mustBlock(t, w2) @@ -385,7 +429,7 @@ func TestLinearSetResources(t *testing.T) { "b": testResource("b"), "c": testResource("c"), }) - verifyResponse(t, w1, "", 0) // removing a resource from the set triggers existing watches for deleted resources + mustBlock(t, w1) // removing a resource from the set does not trigger the watch for non full state resources verifyResponse(t, w2, "3", 2) } @@ -414,16 +458,18 @@ func TestLinearVersionPrefix(t *testing.T) { sub := subFromRequest(req) _, err := c.CreateWatch(req, sub, w) require.NoError(t, err) - verifyResponse(t, w, "instance1-0", 0) + resp := verifyResponse(t, w, "instance1-0", 0) + updateFromSotwResponse(resp, &sub, req) require.NoError(t, c.UpdateResource("a", testResource("a"))) - req.VersionInfo = "instance1-0" _, err = c.CreateWatch(req, sub, w) require.NoError(t, err) - verifyResponse(t, w, "instance1-1", 1) + resp = verifyResponse(t, w, "instance1-1", 1) req.VersionInfo = "instance1-1" + sub.SetReturnedResources(resp.GetReturnedResources()) + w = make(chan Response, 1) _, err = c.CreateWatch(req, sub, w) require.NoError(t, err) mustBlock(t, w) @@ -431,32 +477,77 @@ func TestLinearVersionPrefix(t *testing.T) { } func TestLinearDeletion(t *testing.T) { - c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) - w := make(chan Response, 1) - req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} - sub := subFromRequest(req) - _, err := c.CreateWatch(req, sub, w) - require.NoError(t, err) - mustBlock(t, w) - checkWatchCount(t, c, "a", 1) + t.Run("non full-state resource", func(t *testing.T) { + c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + w := make(chan Response, 1) + req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub := subFromRequest(req) + sub.SetReturnedResources(map[string]string{"a": "0"}) + cancel, err := c.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + checkWatchCount(t, c, "a", 1) - require.NoError(t, c.DeleteResource("a")) - verifyResponse(t, w, "1", 0) - checkWatchCount(t, c, "a", 0) + require.NoError(t, c.DeleteResource("a")) + // For non full-state type, we don't return on deletion + mustBlock(t, w) - req = &Request{TypeUrl: testType, VersionInfo: "0"} - sub = subFromRequest(req) - _, err = c.CreateWatch(req, sub, w) - require.NoError(t, err) - verifyResponse(t, w, "1", 1) - checkWatchCount(t, c, "b", 0) - require.NoError(t, c.DeleteResource("b")) + cancel() + checkWatchCount(t, c, "a", 0) - req.VersionInfo = "1" - _, err = c.CreateWatch(req, sub, w) - require.NoError(t, err) - verifyResponse(t, w, "2", 0) - checkWatchCount(t, c, "b", 0) + // Create a wildcard watch + req = &Request{TypeUrl: testType, VersionInfo: "0"} + sub = subFromRequest(req) + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) + resp := verifyResponse(t, w, "1", 1) + updateFromSotwResponse(resp, &sub, req) + checkWatchCount(t, c, "b", 0) + require.NoError(t, c.DeleteResource("b")) + + req.VersionInfo = "1" + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) + // b is watched by wildcard, but for non-full-state resources we cannot report deletions + mustBlock(t, w) + assert.Len(t, c.watchAll, 1) + }) + + t.Run("full-state resource", func(t *testing.T) { + c := NewLinearCache(resource.ClusterType, WithInitialResources(map[string]types.Resource{"a": &cluster.Cluster{Name: "a"}, "b": &cluster.Cluster{Name: "b"}})) + w := make(chan Response, 1) + req := &Request{ResourceNames: []string{"a"}, TypeUrl: resource.ClusterType, VersionInfo: "0"} + sub := subFromRequest(req) + sub.SetReturnedResources(map[string]string{"a": "0"}) + _, err := c.CreateWatch(req, sub, w) + require.NoError(t, err) + mustBlock(t, w) + checkWatchCount(t, c, "a", 1) + + require.NoError(t, c.DeleteResource("a")) + // We get a response with no resource as full update and only a was requested + resp := verifyResponseResources(t, w, resource.ClusterType, "1") + updateFromSotwResponse(resp, &sub, req) + checkWatchCount(t, c, "a", 0) + + // New wildcard request + req = &Request{TypeUrl: resource.ClusterType, VersionInfo: "0"} + sub = subFromRequest(req) + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) + // b still exists in the cache + resp = verifyResponseResources(t, w, resource.ClusterType, "1", "b") + updateFromSotwResponse(resp, &sub, req) + checkWatchCount(t, c, "b", 0) + require.NoError(t, c.DeleteResource("b")) + + req.VersionInfo = "1" + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) + // The cache no longer contains any resource, and as full-state is requested a response is provided + _ = verifyResponseResources(t, w, resource.ClusterType, "2") + checkWatchCount(t, c, "b", 0) + }) } func TestLinearWatchTwo(t *testing.T) { @@ -465,6 +556,7 @@ func TestLinearWatchTwo(t *testing.T) { w1 := make(chan Response, 1) req1 := &Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"} sub1 := subFromRequest(req1) + sub1.SetReturnedResources(map[string]string{"a": "0", "b": "0"}) _, err := c.CreateWatch(req1, sub1, w1) require.NoError(t, err) mustBlock(t, w1) @@ -472,6 +564,7 @@ func TestLinearWatchTwo(t *testing.T) { w2 := make(chan Response, 1) req2 := &Request{TypeUrl: testType, VersionInfo: "0"} sub2 := subFromRequest(req2) + sub2.SetReturnedResources(map[string]string{"a": "0", "b": "0"}) _, err = c.CreateWatch(req2, sub2, w2) require.NoError(t, err) mustBlock(t, w2) @@ -479,7 +572,7 @@ func TestLinearWatchTwo(t *testing.T) { require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource verifyResponse(t, w1, "1", 1) - verifyResponse(t, w2, "1", 2) + verifyResponse(t, w2, "1", 1) } func TestLinearCancel(t *testing.T) { @@ -490,6 +583,7 @@ func TestLinearCancel(t *testing.T) { w1 := make(chan Response, 1) req1 := &Request{TypeUrl: testType, VersionInfo: "1"} sub1 := subFromRequest(req1) + sub1.SetReturnedResources(map[string]string{"a": "1"}) cancel, err := c.CreateWatch(req1, sub1, w1) require.NoError(t, err) mustBlock(t, w1) @@ -516,13 +610,18 @@ func TestLinearCancel(t *testing.T) { require.NoError(t, err) req2 := &Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"} - cancel2, err := c.CreateWatch(req2, subFromRequest(req2), w2) + sub2 := subFromRequest(req2) + cancel2, err := c.CreateWatch(req2, sub2, w2) require.NoError(t, err) req3 := &Request{TypeUrl: testType, VersionInfo: "1"} - cancel3, err := c.CreateWatch(req3, subFromRequest(req3), w3) + sub3 := subFromRequest(req3) + sub3.SetReturnedResources(map[string]string{"a": "1"}) + cancel3, err := c.CreateWatch(req3, sub3, w3) require.NoError(t, err) req4 := &Request{TypeUrl: testType, VersionInfo: "1"} - cancel4, err := c.CreateWatch(req4, subFromRequest(req4), w4) + sub4 := subFromRequest(req4) + sub4.SetReturnedResources(map[string]string{"a": "1"}) + cancel4, err := c.CreateWatch(req4, sub4, w4) require.NoError(t, err) mustBlock(t, w1) mustBlock(t, w2) @@ -853,7 +952,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { } func TestLinearMixedWatches(t *testing.T) { - c := NewLinearCache(testType) + c := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t))) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} err := c.UpdateResource("a", a) require.NoError(t, err) @@ -864,9 +963,10 @@ func TestLinearMixedWatches(t *testing.T) { assert.Equal(t, 2, c.NumResources()) w := make(chan Response, 1) - sotwReq := &Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()} + sotwReq := &Request{ResourceNames: []string{"a", "b"}, TypeUrl: resource.EndpointType, VersionInfo: c.getVersion()} sotwSub := subFromRequest(sotwReq) - _, err = c.CreateWatch(sotwReq, subFromRequest(sotwReq), w) + sotwSub.SetReturnedResources(map[string]string{"a": "1", "b": "2"}) + _, err = c.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -878,7 +978,8 @@ func TestLinearMixedWatches(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) require.NoError(t, err) // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation - verifyResponse(t, w, c.getVersion(), 1) + resp := verifyResponseResources(t, w, resource.EndpointType, c.getVersion(), "a") + updateFromSotwResponse(resp, &sotwSub, sotwReq) checkVersionMapNotSet(t, c) sotwReq.VersionInfo = c.getVersion() @@ -887,7 +988,7 @@ func TestLinearMixedWatches(t *testing.T) { mustBlock(t, w) checkVersionMapNotSet(t, c) - deltaReq := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} + deltaReq := &DeltaRequest{TypeUrl: resource.EndpointType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} wd := make(chan DeltaResponse, 1) // Initial update @@ -900,14 +1001,13 @@ func TestLinearMixedWatches(t *testing.T) { err = c.UpdateResources(nil, []string{"b"}) require.NoError(t, err) checkVersionMapSet(t, c) - - verifyResponse(t, w, c.getVersion(), 0) - verifyDeltaResponse(t, wd, nil, []string{"b"}) + mustBlock(t, w) // For sotw with non full-state resources, we don't report deletions + verifyDeltaResponse(t, wd, nil, []string{"b"}, responseType(resource.EndpointType)) } func TestLinearSotwWatches(t *testing.T) { t.Run("watches are properly removed from all objects", func(t *testing.T) { - cache := NewLinearCache(testType) + cache := NewLinearCache(testType, WithLogger(log.NewTestLogger(t))) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} err := cache.UpdateResource("a", a) require.NoError(t, err) @@ -923,6 +1023,7 @@ func TestLinearSotwWatches(t *testing.T) { w := make(chan Response, 1) sotwReq := &Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()} sotwSub := subFromRequest(sotwReq) + sotwSub.SetReturnedResources(map[string]string{"a": "1", "b": "2"}) _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) @@ -938,7 +1039,8 @@ func TestLinearSotwWatches(t *testing.T) { }} err = cache.UpdateResources(map[string]types.Resource{"a": a}, nil) require.NoError(t, err) - verifyResponseResources(t, w, testType, cache.getVersion(), "a") + resp := verifyResponseResources(t, w, testType, cache.getVersion(), "a") + updateFromSotwResponse(resp, &sotwSub, sotwReq) checkVersionMapNotSet(t, cache) assert.Empty(t, cache.watches["a"]) @@ -948,8 +1050,7 @@ func TestLinearSotwWatches(t *testing.T) { // c no longer watched w = make(chan Response, 1) sotwReq.ResourceNames = []string{"a", "b"} - sotwReq.VersionInfo = cache.getVersion() - sotwSub.SetResourceSubscription([]string{"a", "b"}) + sotwSub.SetResourceSubscription(sotwReq.ResourceNames) _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) @@ -965,13 +1066,13 @@ func TestLinearSotwWatches(t *testing.T) { assert.Empty(t, cache.watches["c"]) require.NoError(t, err) - verifyResponseResources(t, w, testType, cache.getVersion(), "b") + resp = verifyResponseResources(t, w, testType, cache.getVersion(), "b") + updateFromSotwResponse(resp, &sotwSub, sotwReq) checkVersionMapNotSet(t, cache) w = make(chan Response, 1) sotwReq.ResourceNames = []string{"c"} - sotwReq.VersionInfo = cache.getVersion() - sotwSub.SetResourceSubscription([]string{"c"}) + sotwSub.SetResourceSubscription(sotwReq.ResourceNames) _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) @@ -1005,6 +1106,7 @@ func TestLinearSotwWatches(t *testing.T) { // Non-wildcard request nonWildcardReq := &Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} nonWildcardSub := subFromRequest(nonWildcardReq) + nonWildcardSub.SetReturnedResources(map[string]string{"a": cache.getVersion(), "b": cache.getVersion()}) w1 := make(chan Response, 1) _, err := cache.CreateWatch(nonWildcardReq, nonWildcardSub, w1) require.NoError(t, err) @@ -1014,6 +1116,7 @@ func TestLinearSotwWatches(t *testing.T) { // wildcard request wildcardReq := &Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} wildcardSub := subFromRequest(wildcardReq) + wildcardSub.SetReturnedResources(map[string]string{"a": cache.getVersion(), "b": cache.getVersion(), "c": cache.getVersion()}) w2 := make(chan Response, 1) _, err = cache.CreateWatch(wildcardReq, wildcardSub, w2) require.NoError(t, err) @@ -1023,6 +1126,7 @@ func TestLinearSotwWatches(t *testing.T) { // request not requesting b otherReq := &Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} otherSub := subFromRequest(otherReq) + otherSub.SetReturnedResources(map[string]string{"a": cache.getVersion(), "c": cache.getVersion()}) w3 := make(chan Response, 1) _, err = cache.CreateWatch(otherReq, otherSub, w3) require.NoError(t, err) @@ -1036,17 +1140,18 @@ func TestLinearSotwWatches(t *testing.T) { // Other watch has not triggered mustBlock(t, w3) - verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b") // a is also returned as cluster requires full state - verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c") // a and c are also returned wildcard + resp1 := verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b") // a is also returned as cluster requires full state + updateFromSotwResponse(resp1, &nonWildcardSub, nonWildcardReq) + resp2 := verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c") // a and c are also returned wildcard + updateFromSotwResponse(resp2, &wildcardSub, wildcardReq) // Recreate the watches w1 = make(chan Response, 1) - nonWildcardReq.VersionInfo = cache.getVersion() _, err = cache.CreateWatch(nonWildcardReq, nonWildcardSub, w1) require.NoError(t, err) mustBlock(t, w1) + w2 = make(chan Response, 1) - wildcardReq.VersionInfo = cache.getVersion() _, err = cache.CreateWatch(wildcardReq, wildcardSub, w2) require.NoError(t, err) mustBlock(t, w2) @@ -1061,3 +1166,309 @@ func TestLinearSotwWatches(t *testing.T) { verifyResponseResources(t, w3, resource.ClusterType, cache.getVersion(), "a", "c", "d") }) } + +func TestLinearSotwNonWildcard(t *testing.T) { + var cache *LinearCache + resourceType := resource.EndpointType + + reqs := make([]*discovery.DiscoveryRequest, 4) + subs := make([]stream.Subscription, 4) + watches := make([]chan Response, 4) + + buildRequest := func(res []string, version string) *discovery.DiscoveryRequest { + return &discovery.DiscoveryRequest{ + ResourceNames: res, + TypeUrl: resourceType, + VersionInfo: version, + } + } + updateReqResources := func(index int, res []string) { + t.Helper() + reqs[index-1].ResourceNames = res + subs[index-1].SetResourceSubscription(reqs[index-1].ResourceNames) + } + + createWatchWithCancel := func(index int) func() { + t.Helper() + w := make(chan Response, 1) + cancel, err := cache.CreateWatch(reqs[index-1], subs[index-1], w) + require.NoError(t, err) + watches[index-1] = w + return cancel + } + createWatch := func(index int) { + t.Helper() + _ = createWatchWithCancel(index) + } + validateResponse := func(index int, res []string) { + t.Helper() + resp := verifyResponseResources(t, watches[index-1], resourceType, cache.getVersion(), res...) + updateFromSotwResponse(resp, &subs[index-1], reqs[index-1]) + } + checkPendingWatch := func(index int) { + t.Helper() + mustBlock(t, watches[index-1]) + } + + // We run twice the same sequence of events, + // First run is for endpoints, currently not a full-state resource + // Second run is for clusters, currently a full-state resource + t.Run("type not returning full state", func(t *testing.T) { + resourceType := resource.EndpointType + buildEndpoint := func(name string) *endpoint.ClusterLoadAssignment { + return &endpoint.ClusterLoadAssignment{ClusterName: name} + } + + cache = NewLinearCache(resourceType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": buildEndpoint("a"), + "b": buildEndpoint("b"), + "c": buildEndpoint("c"), + }, + )) + + // Create watches + // Watch 1, wildcard, starting with the current cache version + reqs[0] = buildRequest(nil, cache.getVersion()) + subs[0] = subFromRequest(reqs[0]) + // Watch 2, wildcard, starting with no version (https://github.com/envoyproxy/go-control-plane/issues/855) + reqs[1] = buildRequest([]string{"*"}, "") + subs[1] = subFromRequest(reqs[1]) + // Watch 3, non-wildcard, starting with a different cache prefix + reqs[2] = buildRequest([]string{"a", "b"}, "prefix-"+cache.getVersion()) + subs[2] = subFromRequest(reqs[2]) + // Watch 4, non-wildcard, starting with no version + reqs[3] = buildRequest([]string{"d"}, "") + subs[3] = subFromRequest(reqs[3]) + + // Create watches + // Version is ignored as we cannot guarantee the state, so everything is returned + createWatch(1) + validateResponse(1, []string{"a", "b", "c"}) + // Standard first wilcard request + createWatch(2) + validateResponse(2, []string{"a", "b", "c"}) + // Version has a different prefix and we send everything requested + createWatch(3) + validateResponse(3, []string{"a", "b"}) + // No requested version is available, so we return an empty response on first request + createWatch(4) + validateResponse(4, []string{}) + + // Recreate watches + createWatch(1) + checkPendingWatch(1) + createWatch(2) + checkPendingWatch(2) + createWatch(3) + checkPendingWatch(3) + createWatch(4) + checkPendingWatch(4) + + // Update the cache + _ = cache.UpdateResources(map[string]types.Resource{ + "b": buildEndpoint("b"), + "d": buildEndpoint("d"), + }, []string{"a"}) + + validateResponse(1, []string{"b", "d"}) + validateResponse(2, []string{"b", "d"}) + validateResponse(3, []string{"b"}) + validateResponse(4, []string{"d"}) + + createWatch(1) + checkPendingWatch(1) + // Make watch 2 no longer wildcard + updateReqResources(2, []string{"a", "c", "d"}) + c2 := createWatchWithCancel(2) + checkPendingWatch(2) + c3 := createWatchWithCancel(3) + checkPendingWatch(3) + // Add a resource to watch 4 (https://github.com/envoyproxy/go-control-plane/issues/608) + updateReqResources(4, []string{"c", "d"}) + createWatch(4) + validateResponse(4, []string{"c"}) // c is newly requested, and should be returned + createWatch(4) + checkPendingWatch(4) + + // Add a new resource not requested in all subscriptions + _ = cache.UpdateResource("e", buildEndpoint("e")) + validateResponse(1, []string{"e"}) + createWatch(1) + checkPendingWatch(1) + checkPendingWatch(2) // No longer wildcard + checkPendingWatch(3) + checkPendingWatch(4) + + // Cancel two watches to change resources + assert.Len(t, cache.watches["c"], 2) + c2() + assert.Len(t, cache.watches["c"], 1) + assert.Len(t, cache.watches["b"], 1) + c3() + assert.Len(t, cache.watches["b"], 0) + + // Remove a resource from 2 (was a, c, d) + updateReqResources(2, []string{"a", "d"}) + createWatch(2) + checkPendingWatch(2) + + // 3 is now wildcard (was a, b). The version still matches the previous one + updateReqResources(3, []string{"*"}) + createWatch(3) + validateResponse(3, []string{"c", "d", "e"}) + createWatch(3) + checkPendingWatch(3) + + // Do an update removing a resource only + // This type is not full update, and therefore does not return + _ = cache.UpdateResources(nil, []string{"c"}) + checkPendingWatch(1) + checkPendingWatch(2) + checkPendingWatch(3) + checkPendingWatch(4) + + // Do an update in the cache to confirm all is well + _ = cache.UpdateResources(map[string]types.Resource{ + "a": buildEndpoint("a"), + "b": buildEndpoint("b"), + }, nil) + validateResponse(1, []string{"a", "b"}) + validateResponse(2, []string{"a"}) + validateResponse(3, []string{"a", "b"}) + checkPendingWatch(4) + }) + + t.Run("type returning full state", func(t *testing.T) { + resourceType = resource.ClusterType + buildCluster := func(name string) *cluster.Cluster { + return &cluster.Cluster{Name: name} + } + + cache = NewLinearCache(resource.ClusterType, WithLogger(log.NewTestLogger(t)), WithInitialResources( + map[string]types.Resource{ + "a": buildCluster("a"), + "b": buildCluster("b"), + "c": buildCluster("c"), + }, + )) + + // Create watches + // Watch 1, wildcard, starting with the current cache version + reqs[0] = buildRequest(nil, cache.getVersion()) + subs[0] = subFromRequest(reqs[0]) + // Watch 2, wildcard, starting with no version (https://github.com/envoyproxy/go-control-plane/issues/855) + reqs[1] = buildRequest([]string{"*"}, "") + subs[1] = subFromRequest(reqs[1]) + // Watch 3, non-wildcard, starting with a different cache prefix + reqs[2] = buildRequest([]string{"a", "b"}, "prefix-"+cache.getVersion()) + subs[2] = subFromRequest(reqs[2]) + // Watch 4, non-wildcard, starting with no version + reqs[3] = buildRequest([]string{"d"}, "") + subs[3] = subFromRequest(reqs[3]) + + // Create watches + // Version is ignored as we cannot guarantee the state, so everything is returned + createWatch(1) + validateResponse(1, []string{"a", "b", "c"}) + // Standard first wilcard request + createWatch(2) + validateResponse(2, []string{"a", "b", "c"}) + // Version has a different prefix and we send everything requested + createWatch(3) + validateResponse(3, []string{"a", "b"}) + // No requested version is available, so we return an empty response on first request + createWatch(4) + validateResponse(4, []string{}) + + // Recreate watches + createWatch(1) + checkPendingWatch(1) + createWatch(2) + checkPendingWatch(2) + createWatch(3) + checkPendingWatch(3) + createWatch(4) + checkPendingWatch(4) + + // Update the cache + _ = cache.UpdateResources(map[string]types.Resource{ + "b": buildCluster("b"), + "d": buildCluster("d"), + }, []string{"a"}) + + validateResponse(1, []string{"b", "c", "d"}) + validateResponse(2, []string{"b", "c", "d"}) + validateResponse(3, []string{"b"}) + validateResponse(4, []string{"d"}) + + createWatch(1) + checkPendingWatch(1) + // Make watch 2 no longer wildcard + updateReqResources(2, []string{"a", "c", "d"}) + c2 := createWatchWithCancel(2) + checkPendingWatch(2) + c3 := createWatchWithCancel(3) + checkPendingWatch(3) + // Add a resource to req4 (https://github.com/envoyproxy/go-control-plane/issues/608) + updateReqResources(4, []string{"c", "d"}) + createWatch(4) + validateResponse(4, []string{"c", "d"}) // c is newly requested, and should be returned + createWatch(4) + checkPendingWatch(4) + + // Add a new resource not request in all subscriptions + _ = cache.UpdateResource("e", buildCluster("e")) + validateResponse(1, []string{"b", "c", "d", "e"}) + createWatch(1) + checkPendingWatch(1) + checkPendingWatch(2) // No longer wildcard + checkPendingWatch(3) + checkPendingWatch(4) + + // Cancel two watches to change resources + assert.Len(t, cache.watches["c"], 2) + c2() + assert.Len(t, cache.watches["c"], 1) + assert.Len(t, cache.watches["b"], 1) + c3() + assert.Len(t, cache.watches["b"], 0) + + // Remove a resource from 2 (was a, c, d) + updateReqResources(2, []string{"a", "d"}) + createWatch(2) + checkPendingWatch(2) + + // 3 is now wildcard (was a, b). The version still matches the previous one + updateReqResources(3, []string{"*"}) + createWatch(3) + validateResponse(3, []string{"b", "c", "d", "e"}) + createWatch(3) + checkPendingWatch(3) + + // Do an update removing a resource only + // This type is not full update, and therefore does not return + _ = cache.UpdateResources(nil, []string{"c"}) + validateResponse(1, []string{"b", "d", "e"}) + checkPendingWatch(2) + validateResponse(3, []string{"b", "d", "e"}) + validateResponse(4, []string{"d"}) + + createWatch(1) + checkPendingWatch(1) + createWatch(3) + checkPendingWatch(3) + createWatch(4) + checkPendingWatch(4) + + // Do an update in the cache to confirm all is well + _ = cache.UpdateResources(map[string]types.Resource{ + "a": buildCluster("a"), + "b": buildCluster("b"), + }, nil) + validateResponse(1, []string{"a", "b", "d", "e"}) + validateResponse(2, []string{"a", "d"}) + validateResponse(3, []string{"a", "b", "d", "e"}) + checkPendingWatch(4) + }) +} diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 773c81b9e7..887f687b1b 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -206,8 +206,8 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { if len(resourcesWithTTL) == 0 { continue } - cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.GetResourceNames(), version) - err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true) + cache.log.Debugf("respond open watch %d %v with heartbeat for version %q", id, watch.Request.GetResourceNames(), version) + err := cache.respond(ctx, watch, resourcesWithTTL, version, true) if err != nil { cache.log.Errorf("received error when attempting to respond to watches: %v", err) } @@ -250,9 +250,9 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status respond := func(watch ResponseWatch, id int64) error { version := snapshot.GetVersion(watch.Request.GetTypeUrl()) if version != watch.Request.GetVersionInfo() { - cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) + cache.log.Debugf("respond open watch %d %s %v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) - err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) + err := cache.respond(ctx, watch, resources, version, false) if err != nil { return err } @@ -402,54 +402,58 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu info.lastWatchRequestTime = time.Now() info.mu.Unlock() - var version string + createWatch := func(watch ResponseWatch) func() { + watchID := cache.nextWatchID() + cache.log.Debugf("open watch %d for %s %v from nodeID %q, version %q", watchID, request.GetTypeUrl(), sub.SubscribedResources(), nodeID, request.GetVersionInfo()) + info.mu.Lock() + info.watches[watchID] = watch + info.mu.Unlock() + return cache.cancelWatch(nodeID, watchID) + } + + watch := ResponseWatch{Request: request, Response: value, subscription: sub} + snapshot, exists := cache.snapshots[nodeID] - if exists { - version = snapshot.GetVersion(request.GetTypeUrl()) + if !exists { + return createWatch(watch), nil } - if exists { + version := snapshot.GetVersion(request.GetTypeUrl()) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) + + if request.GetVersionInfo() == version { + // Retrieve whether there are resources in the cache requested and currently unknown to the client. knownResourceNames := sub.ReturnedResources() - diff := []string{} - for _, r := range request.GetResourceNames() { - if _, ok := knownResourceNames[r]; !ok { - diff = append(diff, r) + diff := false + if !sub.IsWildcard() { + // Check if a resource requested is currently not returned, + // for instance if it is newly subscribed + for r := range sub.SubscribedResources() { + if _, ok := knownResourceNames[r]; !ok { + diff = true + break + } } - } - - cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID, - request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff) - - if len(diff) > 0 { - resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) - for _, name := range diff { - if _, exists := resources[name]; exists { - if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { - cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), - request.GetResourceNames(), nodeID, err) - return nil, fmt.Errorf("failed to send the response: %w", err) - } - return func() {}, nil + } else { + // Check if a resource present in the snapshot is currently not returned, + // for instance if the subscription is newly wildcard. + for r := range snapshot.GetResources(request.GetTypeUrl()) { + if _, ok := knownResourceNames[r]; !ok { + diff = true + break } } } + // The version has not changed, and the client already has all requested resources. + if !diff { + return createWatch(watch), nil + } } - // if the requested version is up-to-date or missing a response, leave an open watch - if !exists || request.GetVersionInfo() == version { - watchID := cache.nextWatchID() - cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.GetTypeUrl(), request.GetResourceNames(), nodeID, request.GetVersionInfo()) - info.mu.Lock() - info.watches[watchID] = ResponseWatch{Request: request, Response: value} - info.mu.Unlock() - return cache.cancelWatch(nodeID, watchID), nil - } - - // otherwise, the watch may be responded immediately - resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) - if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { + // The version is different, or the client is subscribed to resources not yet returned to it. + if err := cache.respond(context.Background(), watch, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), - request.GetResourceNames(), nodeID, err) + sub.SubscribedResources(), nodeID, err) return nil, fmt.Errorf("failed to send the response: %w", err) } @@ -476,12 +480,13 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 -func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { +func (cache *snapshotCache) respond(ctx context.Context, watch ResponseWatch, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { + request := watch.Request // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.GetResourceNames()) != 0 && cache.ads { if err := superset(nameSet(request.GetResourceNames()), resources); err != nil { - cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.GetTypeUrl(), request.GetResourceNames(), err) + cache.log.Warnf("ADS mode: not responding to request %s %v: %v", request.GetTypeUrl(), request.GetResourceNames(), err) return nil } } @@ -489,7 +494,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value cache.log.Debugf("respond %s%v version %q with version %q", request.GetTypeUrl(), request.GetResourceNames(), request.GetVersionInfo(), version) select { - case value <- createResponse(ctx, request, resources, version, heartbeat): + case watch.Response <- createResponse(ctx, request, resources, version, heartbeat): return nil case <-ctx.Done(): return context.Canceled @@ -498,6 +503,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response { filtered := make([]types.ResourceWithTTL, 0, len(resources)) + returnedResources := make(map[string]string, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -507,20 +513,23 @@ func createResponse(ctx context.Context, request *Request, resources map[string] for name, resource := range resources { if set[name] { filtered = append(filtered, resource) + returnedResources[name] = version } } } else { - for _, resource := range resources { + for name, resource := range resources { filtered = append(filtered, resource) + returnedResources[name] = version } } return &RawResponse{ - Request: request, - Version: version, - Resources: filtered, - Heartbeat: heartbeat, - Ctx: ctx, + Request: request, + Version: version, + Resources: filtered, + ReturnedResources: returnedResources, + Heartbeat: heartbeat, + Ctx: ctx, } } diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 8bfb698127..95991d52d3 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -34,6 +34,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -52,6 +53,19 @@ func (group) ID(node *core.Node) string { return key } +func subFromRequest(req *cache.Request) stream.Subscription { + return stream.NewSotwSubscription(req.GetResourceNames()) +} + +// This method represents the expected behavior of client and servers regarding the request and the subscription. +// For edge cases it should ignore those +func updateFromSotwResponse(resp cache.Response, sub *stream.Subscription, req *cache.Request) { + sub.SetReturnedResources(resp.GetReturnedResources()) + // Never returns an error when not using passthrough responses + version, _ := resp.GetVersion() + req.VersionInfo = version +} + var ( ttl = 2 * time.Second snapshotWithTTL, _ = cache.NewSnapshotWithTTLs(fixture.version, map[rsrc.Type][]types.ResourceWithTTL{ @@ -87,34 +101,10 @@ var ( } ) -type logger struct { - t *testing.T -} - -func (log logger) Debugf(format string, args ...interface{}) { - log.t.Helper() - log.t.Logf(format, args...) -} - -func (log logger) Infof(format string, args ...interface{}) { - log.t.Helper() - log.t.Logf(format, args...) -} - -func (log logger) Warnf(format string, args ...interface{}) { - log.t.Helper() - log.t.Logf(format, args...) -} - -func (log logger) Errorf(format string, args ...interface{}) { - log.t.Helper() - log.t.Logf(format, args...) -} - func TestSnapshotCacheWithTTL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second) + c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, log.NewTestLogger(t), time.Second) if _, err := c.GetSnapshot(key); err == nil { t.Errorf("unexpected snapshot found for key %q", key) @@ -136,12 +126,13 @@ func TestSnapshotCacheWithTTL(t *testing.T) { // All the resources should respond immediately when version is not up to date. subs := map[string]stream.Subscription{} for _, typ := range testTypes { - sub := stream.NewSotwSubscription(names[typ]) wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() + req := &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]} + sub := subFromRequest(req) value := make(chan cache.Response, 1) - _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + _, err = c.CreateWatch(req, sub, value) require.NoError(t, err) select { case out := <-value: @@ -152,12 +143,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } - returnedResources := make(map[string]string) - // Update sub to track what was returned - for _, resource := range out.GetRequest().GetResourceNames() { - returnedResources[resource] = fixture.version - } - sub.SetReturnedResources(returnedResources) + updateFromSotwResponse(out, &sub, req) subs[typ] = sub case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") @@ -224,7 +210,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { } func TestSnapshotCache(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) if _, err := c.GetSnapshot(key); err == nil { t.Errorf("unexpected snapshot found for key %q", key) @@ -245,9 +231,9 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - sub := stream.NewSotwSubscription([]string{"none"}) - _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - sub, value) + req := &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}} + sub := subFromRequest(req) + _, err = c.CreateWatch(req, sub, value) require.NoError(t, err) select { case out := <-value: @@ -258,9 +244,9 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - sub := stream.NewSotwSubscription(names[typ]) - _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - sub, value) + req := &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]} + sub := subFromRequest(req) + _, err = c.CreateWatch(req, sub, value) require.NoError(t, err) select { case out := <-value: @@ -279,7 +265,7 @@ func TestSnapshotCache(t *testing.T) { } func TestSnapshotCacheFetch(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) } @@ -310,14 +296,15 @@ func TestSnapshotCacheFetch(t *testing.T) { } func TestSnapshotCacheWatch(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) watches := make(map[string]chan cache.Response) subs := map[string]stream.Subscription{} for _, typ := range testTypes { - sub := stream.NewSotwSubscription(names[typ]) + req := &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]} + sub := subFromRequest(req) subs[typ] = sub watches[typ] = make(chan cache.Response, 1) - _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, watches[typ]) + _, err := c.CreateWatch(req, sub, watches[typ]) require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -340,7 +327,7 @@ func TestSnapshotCacheWatch(t *testing.T) { returnedResources[resource] = fixture.version } sub := subs[typ] - sub.SetReturnedResources(returnedResources) + updateFromSotwResponse(out, &sub, out.GetRequest()) subs[typ] = sub case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") @@ -384,7 +371,7 @@ func TestSnapshotCacheWatch(t *testing.T) { } func TestConcurrentSetWatch(t *testing.T) { - c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t)) for i := 0; i < 50; i++ { i := i t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) { @@ -398,11 +385,11 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { - sub := stream.NewSotwSubscription(nil) - cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ + req := &discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, sub, value) + } + cancel, err := c.CreateWatch(req, subFromRequest(req), value) require.NoError(t, err) defer cancel() } @@ -411,11 +398,11 @@ func TestConcurrentSetWatch(t *testing.T) { } func TestSnapshotCacheWatchCancel(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) for _, typ := range testTypes { - sub := stream.NewSotwSubscription(names[typ]) + req := &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]} value := make(chan cache.Response, 1) - cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + cancel, err := c.CreateWatch(req, subFromRequest(req), value) require.NoError(t, err) cancel() } @@ -436,13 +423,13 @@ func TestSnapshotCacheWatchCancel(t *testing.T) { } func TestSnapshotCacheWatchTimeout(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - sub := stream.NewSotwSubscription(names[rsrc.EndpointType]) - _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - sub, watchCh) + req := &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]} + sub := subFromRequest(req) + _, err := c.CreateWatch(req, sub, watchCh) require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. @@ -479,7 +466,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { clusterName2 := "clusterName2" routeName2 := "routeName2" listenerName2 := "listenerName2" - c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t)) snapshot2, _ := cache.NewSnapshot(fixture.version, map[rsrc.Type][]types.Resource{ rsrc.EndpointType: {testEndpoint, resource.MakeEndpoint(clusterName2, 8080)}, @@ -497,8 +484,8 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { - _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewSotwSubscription([]string{clusterName}), watch) + req := &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}} + _, err := c.CreateWatch(req, subFromRequest(req), watch) require.NoError(t, err) }() @@ -517,12 +504,13 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { - sub := stream.NewSotwSubscription([]string{clusterName, clusterName2}) - sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) - _, err := c.CreateWatch(&discovery.DiscoveryRequest{ + req := &discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, sub, watch) + } + sub := subFromRequest(req) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) + _, err := c.CreateWatch(req, sub, watch) require.NoError(t, err) }() @@ -539,12 +527,13 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } // Repeat request for with same version and make sure a watch is created - sub := stream.NewSotwSubscription([]string{clusterName, clusterName2}) - sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) - cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ + req := &discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, sub, watch) + } + sub := subFromRequest(req) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + cancel, err := c.CreateWatch(req, sub, watch) require.NoError(t, err) if cancel == nil { t.Fatal("Should create a watch") @@ -554,7 +543,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } func TestSnapshotClear(t *testing.T) { - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) } @@ -636,7 +625,7 @@ func TestSnapshotSingleResourceFetch(t *testing.T) { return dst } - c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t)) require.NoError(t, c.SetSnapshot(context.Background(), key, &singleResourceSnapshot{ version: "version-one", typeurl: durationTypeURL, diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index 18465f6dc5..b2a3063c69 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -90,6 +90,9 @@ type ResponseWatch struct { // Response is the channel to push responses to. Response chan Response + + // Subscription stores the current client subscription state. + subscription Subscription } // DeltaResponseWatch is a watch record keeping both the delta request and an open channel for the delta response. diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 4d056ea1f2..ec214fba6c 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -121,17 +121,7 @@ func (s *streamWrapper) send(resp cache.Response) error { } // Track in the type subcription the nonce and objects returned to the client. - version, err := resp.GetVersion() - if err != nil { - return err - } - // ToDo(valerian-roche): properly return the resources actually sent to the client - // Currently we set all resources requested, which is non-descriptive when using wildcard. - resources := make(map[string]string, len(resp.GetRequest().GetResourceNames())) - for _, r := range resp.GetRequest().GetResourceNames() { - resources[r] = version - } - w.sub.SetReturnedResources(resources) + w.sub.SetReturnedResources(resp.GetReturnedResources()) w.nonce = out.Nonce // Register with the callbacks provided that we are sending the response. diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go index 106d97844e..bf878ed788 100644 --- a/pkg/server/stream/v3/subscription.go +++ b/pkg/server/stream/v3/subscription.go @@ -74,6 +74,17 @@ func (s *Subscription) SetResourceSubscription(subscribed []string) { } } + if !explicitWildcardSet { + // Cleanup resources no longer subscribed to. + // This ensures later subscriptions will trigger responses, + // even if the version has not changed + for resource := range s.returnedResources { + if _, ok := subscribedResources[resource]; !ok { + delete(s.returnedResources, resource) + } + } + } + // Explicit subscription to wildcard as we are not in legacy wildcard behavior s.wildcard = explicitWildcardSet s.subscribedResourceNames = subscribedResources @@ -132,6 +143,10 @@ func (s *Subscription) UpdateResourceSubscriptions(subscribed, unsubscribed []st // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response s.returnedResources[resource] = "" + } else { + // Cleanup unsubscribed resources. This avoids returning a response + // if the versions have not changed + delete(s.returnedResources, resource) } delete(s.subscribedResourceNames, resource) }