diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 55b664a12a..1c6d228222 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -36,16 +36,16 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest -// SubscriptionState provides additional data on the client knowledge for the type matching the request -// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources) +// SubscriptionState stores the server view of the client state for a given resource type. +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources). // Though the methods may return mutable parts of the state for performance reasons, -// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation. type SubscriptionState interface { - // GetKnownResources returns a list of resources that the client has ACK'd and their associated version. + // GetACKedResources returns a list of resources that the client has ACK'd and their associated version. // The versions are: // - delta protocol: version of the specific resource set in the response // - sotw protocol: version of the global response when the resource was last ACKed - GetKnownResources() map[string]string + GetACKedResources() map[string]string // GetSubscribedResources returns the list of resources currently subscribed to by the client for the type. // For delta it keeps track of subscription updates across requests diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 9dc3f87127..276dce4e60 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetKnownResources()) == 0 { + if len(state.GetACKedResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -45,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetKnownResources()[name] + prevVersion, found := state.GetACKedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -53,7 +53,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetKnownResources() { + for name := range state.GetACKedResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } @@ -63,7 +63,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map for name := range state.GetSubscribedResources() { - prevVersion, found := state.GetKnownResources()[name] + prevVersion, found := state.GetACKedResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 811e256a36..445cdf0a61 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -144,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) { case out := <-watches[typ]: assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() - streams[typ].SetKnownResources(nextVersionMap) + streams[typ].SetACKedResources(nextVersionMap) case <-time.After(time.Second): require.Fail(t, "failed to receive a snapshot response") } @@ -181,7 +181,7 @@ func TestDeltaRemoveResources(t *testing.T) { assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources) nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change") + assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): assert.Fail(t, "failed to receive snapshot response") } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 3a397f6f66..1414e9893e 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -195,7 +195,7 @@ func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error { return err } resp := <-w - state.SetKnownResources(resp.GetNextVersionMap()) + state.SetACKedResources(resp.GetNextVersionMap()) _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values return err } @@ -674,7 +674,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetKnownResources(resp.GetNextVersionMap()) + state.SetACKedResources(resp.GetNextVersionMap()) // Multiple updates _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) @@ -695,7 +695,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetKnownResources(resp.GetNextVersionMap()) + state.SetACKedResources(resp.GetNextVersionMap()) // Update/add/delete _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) @@ -715,7 +715,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetKnownResources(resp.GetNextVersionMap()) + state.SetACKedResources(resp.GetNextVersionMap()) // Re-add previously deleted watched resource _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) @@ -732,7 +732,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetKnownResources(resp.GetNextVersionMap()) + state.SetACKedResources(resp.GetNextVersionMap()) // Wildcard create/update require.NoError(t, createWildcardDeltaWatch(c, w)) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index b0840eb7ff..d2c0d9a88f 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -408,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState Subscripti } if exists { - knownResourceNames := clientState.GetKnownResources() + knownResourceNames := clientState.GetACKedResources() diff := []string{} for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index b65e16a8e6..8b29f9cdbd 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -149,7 +149,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { } // Update streamState for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetKnownResources()[resource] = fixture.version + streamState.GetACKedResources()[resource] = fixture.version } case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") @@ -189,7 +189,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetKnownResources()[resource] = fixture.version + streamState.GetACKedResources()[resource] = fixture.version } case <-end: cancel() @@ -321,7 +321,7 @@ func TestSnapshotCacheWatch(t *testing.T) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetKnownResources()[resource] = fixture.version + streamState.GetACKedResources()[resource] = fixture.version } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") @@ -501,7 +501,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { state := stream.NewSubscriptionState(false, map[string]string{}) - state.SetKnownResources(map[string]string{clusterName: fixture.version}) + state.SetACKedResources(map[string]string{clusterName: fixture.version}) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}}, &state, watch) require.NoError(t, err) @@ -521,7 +521,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Repeat request for with same version and make sure a watch is created state := stream.NewSubscriptionState(false, map[string]string{}) - state.SetKnownResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + state.SetACKedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil { t.Fatal("Should create a watch") diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 514872b016..89a85eba02 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -118,7 +118,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De watch := watches.deltaWatches[typ] watch.nonce = nonce - watch.state.SetKnownResources(resp.GetNextVersionMap()) + watch.state.SetACKedResources(resp.GetNextVersionMap()) watches.deltaWatches[typ] = watch return nil } @@ -285,7 +285,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.Subscriptio // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response - streamState.GetKnownResources()[resource] = "" + streamState.GetACKedResources()[resource] = "" } delete(sv, resource) } diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 92e2f4f8f0..98903d201b 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -103,7 +103,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } typeURL := req.GetTypeUrl() - streamState, ok := sw.streamStates[typeURL] + streamState, ok := sw.streamState[typeURL] if !ok { // Supports legacy wildcard mode // Wildcard will be set to true if no resource is set @@ -114,7 +114,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe if lastResponse, ok := sw.lastDiscoveryResponses[typeURL]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - streamState.SetKnownResources(lastResponse.resources) + streamState.SetACKedResources(lastResponse.resources) } } @@ -157,7 +157,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe }) } - sw.streamStates[req.TypeUrl] = streamState + sw.streamState[req.TypeUrl] = streamState } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 410b62c92d..8144e46d41 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -91,7 +91,7 @@ type streamWrapper struct { // The below fields are used for tracking resource // cache state and should be maintained per stream. - streamStates map[string]stream.SubscriptionState + streamState map[string]stream.SubscriptionState lastDiscoveryResponses map[string]lastDiscoveryResponse } diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 2c58b0843b..87b7350be1 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -27,7 +27,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque // a collection of stack allocated watches per request type. watches: newWatches(), - streamStates: make(map[string]stream.SubscriptionState), + streamState: make(map[string]stream.SubscriptionState), lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), } @@ -110,7 +110,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque req.TypeUrl = defaultTypeURL } - streamState := sw.streamStates[req.TypeUrl] + streamState := sw.streamState[req.TypeUrl] if s.callbacks != nil { if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { @@ -121,7 +121,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - streamState.SetKnownResources(lastResponse.resources) + streamState.SetACKedResources(lastResponse.resources) } } @@ -157,7 +157,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque }) } - sw.streamStates[req.TypeUrl] = streamState + sw.streamState[req.TypeUrl] = streamState // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go index c5a17b46b8..84da9211cc 100644 --- a/pkg/server/stream/v3/subscription.go +++ b/pkg/server/stream/v3/subscription.go @@ -1,17 +1,16 @@ package stream -// SubscriptionState will keep track of a resource subscription on a stream. +// SubscriptionState stores the server view of a given type subscription in a stream. type SubscriptionState struct { - // wildcard is set if the subscription currently has a wildcard watch + // wildcard indicates if the subscription currently has a wildcard watch. wildcard bool // subscribedResourceNames provides the resources explicitly requested by the client - // This list might be non-empty even when set as wildcard + // This list might be non-empty even when set as wildcard. subscribedResourceNames map[string]struct{} - // resourceVersions contains the resources acknowledged by the client and the versions - // associated to them - resourceVersions map[string]string + // ackedResources contains the resources acknowledged by the client and the acknowledged versions. + ackedResources map[string]string } // NewSubscriptionState initializes a stream state. @@ -19,11 +18,11 @@ func NewSubscriptionState(wildcard bool, initialResourceVersions map[string]stri state := SubscriptionState{ wildcard: wildcard, subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, + ackedResources: initialResourceVersions, } if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) + state.ackedResources = make(map[string]string) } return state @@ -43,16 +42,16 @@ func (s *SubscriptionState) SetSubscribedResources(subscribedResourceNames map[s s.subscribedResourceNames = subscribedResourceNames } -// GetKnownResources returns the list of resources acknowledged by the client +// GetACKedResources returns the list of resources acknowledged by the client // and their acknowledged version -func (s SubscriptionState) GetKnownResources() map[string]string { - return s.resourceVersions +func (s SubscriptionState) GetACKedResources() map[string]string { + return s.ackedResources } -// SetKnownResources sets a list of resource versions currently known by the client +// SetACKedResources sets a list of resource versions currently known by the client // The cache can use this state to compute resources added/updated/deleted -func (s *SubscriptionState) SetKnownResources(resourceVersions map[string]string) { - s.resourceVersions = resourceVersions +func (s *SubscriptionState) SetACKedResources(resourceVersions map[string]string) { + s.ackedResources = resourceVersions } // SetWildcard will set the subscription to return all known resources diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 077522c1de..95e8a3dacc 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -37,7 +37,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetKnownResources()) == 0 { + if len(state.GetACKedResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -46,14 +46,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetKnownResources()[name] + prevVersion, found := state.GetACKedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetKnownResources() { + for name := range state.GetACKedResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } @@ -63,7 +63,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map for name := range state.GetSubscribedResources() { - prevVersion, found := state.GetKnownResources()[name] + prevVersion, found := state.GetACKedResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion {