diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 5ad8e24140..081b0a216f 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest +// Subscription stores the server view of the client state for a given resource type. +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources). +// Though the methods may return mutable parts of the state for performance reasons, +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation. +type Subscription interface { + // ReturnedResources returns a list of resources that clients have ACK'd and their associated version. + // The versions are: + // - delta protocol: version of the specific resource set in the response + // - sotw protocol: version of the global response when the resource was last ACKed + ReturnedResources() map[string]string + + // SubscribedResources returns the list of resources currently subscribed to by the client for the type. + // For delta it keeps track of subscription updates across requests + // For sotw it is a normalized view of the last request resources + SubscribedResources() map[string]struct{} + + // IsWildcard returns whether the client has a wildcard watch. + // This considers subtleties related to the current migration of wildcard definitions within the protocol. + // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return + IsWildcard() bool + + // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. + // It is currently only applicable to delta-xds. + // If the request is wildcard, it will always return true, + // otherwise it will compare the provided resources to the list of resources currently subscribed + WatchesResources(resourceNames map[string]struct{}) bool +} + // ConfigWatcher requests watches for configuration resources by a node, last // applied version identifier, and resource names hint. The watch should send // the responses when they are ready. The watch can be canceled by the @@ -54,7 +81,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) + CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error) // CreateDeltaWatch returns a new open incremental xDS watch. // This is the entrypoint to propagate configuration changes the @@ -66,7 +93,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index deaeeb7ed1..c7920d9fd2 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,7 +18,6 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function @@ -28,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscription, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -46,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -54,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { + for name := range state.ReturnedResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.SubscribedResources() { + prevVersion, found := state.ReturnedResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index fc4ee91327..44f176db4b 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -3,7 +3,6 @@ package cache_test import ( "context" "fmt" - "reflect" "testing" "time" @@ -36,13 +35,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub := stream.NewSubscription(true, nil) + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(true, nil), watches[typ]) + }, sub, watches[typ]) + require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -68,17 +69,20 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // all resources as well as individual resource removals for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(false, versionMap[typ]) + sub := stream.NewSubscription(false, versionMap[typ]) + resources := []string{} for resource := range versionMap[typ] { - state.GetSubscribedResourceNames()[resource] = struct{}{} + resources = append(resources, resource) } - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub.SetResourceSubscription(resources) + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, sub, watches[typ]) + require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -111,36 +115,40 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { func TestDeltaRemoveResources(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) watches := make(map[string]chan cache.DeltaResponse) - streams := make(map[string]*stream.StreamState) + subs := make(map[string]*stream.Subscription) + // At this stage the cache is empty, so a watch is opened for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(true, make(map[string]string)) - streams[typ] = &state + sub := stream.NewSubscription(true, make(map[string]string)) + subs[typ] = &sub // We don't specify any resource name subscriptions here because we want to make sure we test wildcard // functionality. This means we should receive all resources back without requesting a subscription by name. - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, *subs[typ], watches[typ]) + require.NoError(t, err) } - if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { - t.Fatal(err) - } + snapshot := fixture.snapshot() + snapshot.Resources[types.Endpoint] = cache.NewResources(fixture.version, []types.Resource{ + testEndpoint, + resource.MakeEndpoint("otherCluster", 8080), + }) + require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot)) for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { select { case out := <-watches[typ]: - snapshot := fixture.snapshot() assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() - streams[typ].SetResourceVersions(nextVersionMap) + subs[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): - t.Fatal("failed to receive a snapshot response") + require.Fail(t, "failed to receive a snapshot response") } }) } @@ -149,39 +157,35 @@ func TestDeltaRemoveResources(t *testing.T) { // test the removal of certain resources from a partial snapshot for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - }, *streams[typ], watches[typ]) + TypeUrl: typ, + ResponseNonce: "nonce", + }, *subs[typ], watches[typ]) + require.NoError(t, err) } - if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { - t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes)) - } + assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version") - // set a partially versioned snapshot with no endpoints + // set a partially versioned snapshot with only one endpoint snapshot2 := fixture.snapshot() - snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil { - t.Fatal(err) - } + snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{ + testEndpoint, // this cluster is not changed, we do not expect it back in "resources" + }) + require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2)) // validate response for endpoints select { case out := <-watches[testTypes[0]]: - snapshot2 := fixture.snapshot() - snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assert.Empty(t, out.(*cache.RawDeltaResponse).Resources) + assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources) nextVersionMap := out.GetNextVersionMap() - // make sure the version maps are different since we no longer are tracking any endpoint resources - if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) { - t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap) - } + assert.NotEqual(t, nextVersionMap, subs[testTypes[0]].ReturnedResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): - t.Fatal("failed to receive snapshot response") + assert.Fail(t, "failed to receive snapshot response") } } @@ -204,13 +208,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub := stream.NewSubscription(false, make(map[string]string)) + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, sub, responses) + require.NoError(t, err) defer cancel() } @@ -226,22 +232,23 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{names[rsrc.EndpointType][0]}, nil) + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, sub, watchCh) + require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - err := c.SetSnapshot(ctx, key, fixture.snapshot()) - require.EqualError(t, err, context.Canceled.Error()) + err = c.SetSnapshot(ctx, key, fixture.snapshot()) + assert.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, // we can retry by setting the same snapshot. In other words, we keep the watch open even if we failed @@ -270,13 +277,15 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub := stream.NewSubscription(false, make(map[string]string)) + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, sub, responses) + require.NoError(t, err) // Cancel the watch cancel() diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index f7786ac4f9..c4d29aa932 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "fmt" "strconv" "strings" "sync" @@ -24,7 +25,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type watches = map[chan Response]struct{} @@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { versionMap: nil, version: 0, versionVector: make(map[string]uint64), + log: log.NewDefaultLogger(), } for _, opt := range opts { opt(out) @@ -164,11 +165,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.StreamState.WatchesResources(modified) { + if !watch.subscription.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + res := cache.respondDelta(watch.Request, watch.Response, watch.subscription) if res != nil { delete(cache.deltaWatches, id) } @@ -176,8 +177,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -185,10 +186,8 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe // Only send a response if there were changes if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { - if cache.log != nil { - cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) - } + cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) value <- resp return resp } @@ -298,10 +297,10 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { value <- nil - return nil + return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL) } // If the version is not up to date, check whether any requested resource has // been updated between the last version and the current version. This avoids the problem @@ -338,7 +337,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va } if stale { cache.respond(value, staleResources) - return nil + return nil, nil } // Create open watches since versions are up to date. if len(request.GetResourceNames()) == 0 { @@ -347,7 +346,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() delete(cache.watchAll, value) - } + }, nil } for _, name := range request.GetResourceNames() { set, exists := cache.watches[name] @@ -369,10 +368,10 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va delete(cache.watches, name) } } - } + }, nil } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { cache.mu.Lock() defer cache.mu.Unlock() @@ -385,27 +384,25 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S modified[name] = struct{}{} } err := cache.updateVersionMap(modified) - if err != nil && cache.log != nil { + if err != nil { cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, state) + response := cache.respondDelta(request, value, sub) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly if response == nil { watchID := cache.nextDeltaWatchID() - if cache.log != nil { - cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) - } + cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, + cache.typeURL, sub.SubscribedResources(), cache.getVersion()) - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub} - return cache.cancelDeltaWatch(watchID) + return cache.cancelDeltaWatch(watchID), nil } - return nil + return nil, nil } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 82478ca3aa..d3287bd398 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -189,57 +189,62 @@ func hashResource(t *testing.T, resource types.Resource) string { return v } -func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) { - state := stream.NewStreamState(true, nil) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) +func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error { + sub := stream.NewSubscription(true, nil) + if _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w); err != nil { + return err + } resp := <-w - state.SetResourceVersions(resp.GetNextVersionMap()) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values + sub.SetReturnedResources(resp.GetNextVersionMap()) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) // Ensure the watch is set properly with cache values + return err } func TestLinearInitialResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) err := c.UpdateResource("a", nil) - if err == nil { - t.Error("expected error on nil resource") - } + assert.Error(t, err, "expected error on nil resource") + // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: "test"}, sub, w) + assert.Error(t, err, "watch should fail to be created") select { case r := <-w: - if r != nil { - t.Error("response should be nil") - } + assert.Nil(t, r, "response should be nil") default: - t.Error("should receive nil response") + assert.Fail(t, "should receive nil response") } } func TestLinearBasic(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) + require.NoError(t, err) mustBlock(t, w1) checkVersionMapNotSet(t, c) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -250,34 +255,40 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) + require.NoError(t, err) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w2) + require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -287,9 +298,11 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w1) + require.NoError(t, err) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w2) + require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -300,9 +313,11 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, sub, w1) + require.NoError(t, err) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, sub, w2) + require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -330,49 +345,57 @@ func TestLinearGetResources(t *testing.T) { } func TestLinearVersionPrefix(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) + require.NoError(t, err) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w1) + require.NoError(t, err) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -381,20 +404,22 @@ func TestLinearWatchTwo(t *testing.T) { } func TestLinearCancel(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -404,10 +429,14 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) + cancel2, err := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, sub, w2) + require.NoError(t, err) + cancel3, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w3) + require.NoError(t, err) + cancel4, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w4) + require.NoError(t, err) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -429,7 +458,7 @@ func TestLinearCancel(t *testing.T) { // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be // rewritten to not require that. This is not the case in the GH actions environment. func TestLinearConcurrentSetWatch(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) n := 50 for i := 0; i < 2*n; i++ { @@ -444,12 +473,13 @@ func TestLinearConcurrentSetWatch(t *testing.T) { id2 := fmt.Sprintf("%d", i-1) t.Logf("request resources %q and %q", id, id2) value := make(chan Response, 1) - c.CreateWatch(&Request{ + _, err := c.CreateWatch(&Request{ // Only expect one to become stale ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + }, sub, value) + require.NoError(t, err) // wait until all updates apply verifyResponse(t, value, "", 1) } @@ -460,20 +490,22 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) - state1 := stream.NewStreamState(true, map[string]string{}) + sub1 := stream.NewSubscription(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub1, w1) + require.NoError(t, err) mustBlockDelta(t, w1) - state2 := stream.NewStreamState(true, map[string]string{}) + sub2 := stream.NewSubscription(true, map[string]string{}) w2 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub2, w2) + require.NoError(t, err) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) - err := c.UpdateResource("a", a) - require.NoError(t, err) + err = c.UpdateResource("a", a) + assert.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) verifyDeltaResponse(t, w2, []resourceInfo{{"a", hash}}, nil) @@ -490,17 +522,19 @@ func TestLinearDeltaExistingResources(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"b", "c"}, nil) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) - state = stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -516,17 +550,19 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, map[string]string{"b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b @@ -550,18 +586,20 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { // There is currently no delta watch checkVersionMapNotSet(t, c) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -586,17 +624,19 @@ func TestLinearDeltaResourceDelete(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -611,14 +651,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) // The version map should now be created, even if empty @@ -627,16 +668,17 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) - err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) - require.NoError(t, err) + err = c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) + assert.NoError(t, err) resp := <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Multiple updates - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -653,10 +695,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Update/add/delete - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -672,10 +715,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource @@ -688,10 +732,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Wildcard create/update - createWildcardDeltaWatch(c, w) + require.NoError(t, createWildcardDeltaWatch(c, w)) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -707,7 +751,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.Equal(t, 3, c.NumResources()) // Wildcard update/delete - createWildcardDeltaWatch(c, w) + require.NoError(t, createWildcardDeltaWatch(c, w)) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -736,9 +780,10 @@ func TestLinearMixedWatches(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, c.NumResources()) - sotwState := stream.NewStreamState(false, nil) + sotwSub := stream.NewSubscription(false, nil) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwSub, w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -752,16 +797,18 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwSub, w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) - deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + deltaSub := stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + deltaSub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) wd := make(chan DeltaResponse, 1) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaSub, wd) + require.NoError(t, err) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) checkVersionMapSet(t, c) diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 9fdfb090d6..2a7a84a49d 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -17,8 +17,7 @@ package cache import ( "context" "errors" - - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" + "fmt" ) // MuxCache multiplexes across several caches using a classification function. @@ -37,24 +36,22 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateWatch(request, state, value) + return cache.CreateWatch(request, sub, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateDeltaWatch(request, state, value) + return cache.CreateDeltaWatch(request, sub, value) } func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index ebf63f5b6f..b609d19f51 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -23,7 +23,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // ResourceSnapshot is an abstract snapshot of a collection of resources that @@ -310,7 +309,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -328,7 +327,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -385,7 +384,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. -func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() @@ -409,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl()) + knownResourceNames := sub.ReturnedResources() diff := []string{} for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { @@ -427,9 +426,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } } } @@ -442,7 +441,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() - return cache.cancelWatch(nodeID, watchID) + return cache.cancelWatch(nodeID, watchID), nil } // otherwise, the watch may be responded immediately @@ -450,10 +449,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } func (cache *snapshotCache) nextWatchID() int64 { @@ -525,7 +524,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -554,7 +553,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, sub) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -566,33 +565,33 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, sub.SubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, sub.SubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeID, watchID) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscription: sub}) + return cache.cancelDeltaWatch(nodeID, watchID), nil } - return nil + return nil, nil } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, sub, resourceContainer{ resourceMap: snapshot.GetResources(request.GetTypeUrl()), versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), systemVersion: snapshot.GetVersion(request.GetTypeUrl()), }) // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // We want to respond immediately for the first request in a subscription if it is wildcard, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (sub.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index eba4cf96d9..f2c961d1b2 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -91,10 +91,22 @@ type logger struct { t *testing.T } -func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Debugf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Infof(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Warnf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Errorf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} func TestSnapshotCacheWithTTL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -119,13 +131,14 @@ func TestSnapshotCacheWithTTL(t *testing.T) { wg := sync.WaitGroup{} // All the resources should respond immediately when version is not up to date. - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + require.NoError(t, err) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version { @@ -134,8 +147,10 @@ func TestSnapshotCacheWithTTL(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } - // Update streamState - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + sub.ReturnedResources()[resource] = fixture.version + } case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -154,8 +169,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { end := time.After(5 * time.Second) for { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, + sub, value) + require.NoError(t, err) select { case out := <-value: @@ -172,7 +188,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + sub.ReturnedResources()[resource] = fixture.version + } case <-end: cancel() return @@ -214,9 +232,11 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + sub := stream.NewSubscription(false, map[string]string{}) + _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, + sub, value) + require.NoError(t, err) + select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -226,9 +246,10 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + sub := stream.NewSubscription(false, map[string]string{}) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, + sub, value) + require.NoError(t, err) select { case out := <-value: snapshot := fixture.snapshot() @@ -279,10 +300,11 @@ func TestSnapshotCacheFetch(t *testing.T) { func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, watches[typ]) + require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) @@ -298,7 +320,9 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + sub.ReturnedResources()[resource] = fixture.version + } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -308,8 +332,9 @@ func TestSnapshotCacheWatch(t *testing.T) { // open new watches with the latest version for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, + sub, watches[typ]) + require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -354,11 +379,13 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { - streamState := stream.NewStreamState(false, map[string]string{}) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSubscription(false, map[string]string{}) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, sub, value) + require.NoError(t, err) + defer cancel() } }) @@ -367,10 +394,11 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + require.NoError(t, err) cancel() } // should be status info for the node @@ -394,16 +422,17 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + sub := stream.NewSubscription(false, map[string]string{}) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, + sub, watchCh) + require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - err := c.SetSnapshot(ctx, key, fixture.snapshot()) - require.EqualError(t, err, context.Canceled.Error()) + err = c.SetSnapshot(ctx, key, fixture.snapshot()) + assert.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, // we can retry by setting the same snapshot. In other words, we keep the watch open even if we failed @@ -450,8 +479,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewStreamState(false, map[string]string{}), watch) + sub := stream.NewSubscription(false, map[string]string{}) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, + &sub, watch) + require.NoError(t, err) }() select { @@ -469,12 +500,11 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) - c.CreateWatch(&discovery.DiscoveryRequest{ - TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}, - }, state, watch) + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, + ResourceNames: []string{clusterName, clusterName2}}, &sub, watch) + require.NoError(t, err) }() select { @@ -490,14 +520,13 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } // Repeat request for with same version and make sure a watch is created - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) - if cancel := c.CreateWatch(&discovery.DiscoveryRequest{ - TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}, - }, state, watch); cancel == nil { + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, + ResourceNames: []string{clusterName, clusterName2}}, &sub, watch); cancel == nil { t.Fatal("Should create a watch") } else { + require.NoError(t, err) cancel() } } @@ -624,9 +653,10 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { ResourceNames: []string{"rtds"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) + sub := stream.NewSubscription(false, map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) - c.CreateWatch(req, ss, responder) + _, err := c.CreateWatch(req, &sub, responder) + require.NoError(t, err) go func() { // Wait for at least one heartbeat to occur, then set snapshot. diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index dca93e02ff..18465f6dc5 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -20,7 +20,6 @@ import ( "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // NodeHash computes string identifiers for Envoy nodes. @@ -101,8 +100,8 @@ type DeltaResponseWatch struct { // Response is the channel to push the delta responses to Response chan DeltaResponse - // VersionMap for the stream - StreamState stream.StreamState + // Subscription stores the current client subscription state. + subscription Subscription } // newStatusInfo initializes a status info data structure. diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index b746acfab9..8dd31a2923 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -1,15 +1,20 @@ package config +import "github.com/envoyproxy/go-control-plane/pkg/log" + // Opts for individual xDS implementations that can be // utilized through the functional opts pattern. type Opts struct { // If true respond to ADS requests with a guaranteed resource ordering Ordered bool + + Logger log.Logger } func NewOpts() Opts { return Opts{ Ordered: false, + Logger: log.NewDefaultLogger(), } } diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index b570b19b27..1cb70159e6 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -12,6 +12,7 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" @@ -49,6 +50,13 @@ type server struct { opts config.Opts } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { s := &server{ @@ -118,7 +126,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De watch := watches.deltaWatches[typ] watch.nonce = nonce - watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watch.subscription.SetReturnedResources(resp.GetNextVersionMap()) watches.deltaWatches[typ] = watch return nil } @@ -204,21 +212,24 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // cancel existing watch to (re-)request a newer version watch, ok := watches.deltaWatches[typeURL] if !ok { - // Initialize the state of the stream. - // Since there was no previous state, we know we're handling the first request of this type + // Initialize the state of the type subscription. + // Since there was no previous subscription, we know we're handling the first request of this type // so we set the initial resource versions if we have any. - // We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). - // If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard. + // We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). + // If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard. // It can still be done by explicitly unsubscribing from "*" - watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) + watch.subscription = stream.NewSubscription(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) } else { watch.Cancel() } - s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) - s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) + watch.subscription.UpdateResourceSubscriptions(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe()) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) + var err error + watch.cancel, err = s.cache.CreateDeltaWatch(req, watch.subscription, watches.deltaMuxedResponses) + if err != nil { + return err + } watches.deltaWatches[typeURL] = watch } } @@ -249,40 +260,3 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro return s.processDelta(str, reqCh, typeURL) } - -// When we subscribe, we just want to make the cache know we are subscribing to a resource. -// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. -func (s *server) subscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(true) - continue - } - sv[resource] = struct{}{} - } -} - -// Unsubscriptions remove resources from the stream's subscribed resource list. -// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. -func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(false) - continue - } - if _, ok := sv[resource]; ok && streamState.IsWildcard() { - // The XDS protocol states that: - // * if a watch is currently wildcard - // * a resource is explicitly unsubscribed by name - // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) - // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated - // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: - // * detect the version change, and return the resource (as an update) - // * detect the resource deletion, and set it as removed in the response - streamState.GetResourceVersions()[resource] = "" - } - delete(sv, resource) - } -} diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 63c4c2d38d..98e4991b49 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -41,7 +41,7 @@ type watch struct { cancel func() nonce string - state stream.StreamState + subscription stream.Subscription } // Cancel calls terminate and cancel diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index bbb6dd4b20..8139319f5b 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -8,28 +8,13 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // process handles a bi-di stream request func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // We make a responder channel here so we can multiplex responses from the dynamic channels. - sw.watches.addWatch(resource.AnyType, &watch{ - // Create a buffered channel the size of the known resource types. - response: make(chan cache.Response, types.UnknownType), - cancel: func() { - close(sw.watches.responders[resource.AnyType].response) - }, - }) - - process := func(resp cache.Response) error { - nonce, err := sw.send(resp) - if err != nil { - return err - } - - sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce - return nil - } + // Create a buffered channel the size of the known resource types. + respChan := make(chan cache.Response, types.UnknownType) // Instead of creating a separate channel for each incoming request and abandoning the old one // This algorithm uses (and reuses) a single channel for all request types and guarantees @@ -42,9 +27,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe for { select { // We watch the multiplexed ADS channel for incoming responses. - case res := <-sw.watches.responders[resource.AnyType].response: + case res := <-respChan: if res.GetRequest().GetTypeUrl() != typeURL { - if err := process(res); err != nil { + if err := sw.send(res); err != nil { return err } } @@ -62,9 +47,8 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe select { case <-s.ctx.Done(): return nil - // We only watch the multiplexed channel since all values will come through from process. - case res := <-sw.watches.responders[resource.AnyType].response: - if err := process(res); err != nil { + case res := <-respChan: + if err := sw.send(res); err != nil { return status.Errorf(codes.Unavailable, err.Error()) } case req, ok := <-reqCh: @@ -85,9 +69,6 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe req.Node = sw.node } - // Nonces can be reused across streams; we verify nonce only if nonce is not initialized. - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { if req.GetTypeUrl() == "" { @@ -101,40 +82,47 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + typeURL := req.GetTypeUrl() + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol defintion + s.opts.Logger.Debugf("[sotw ads] Skipping request as nonce is stale for %s", typeURL) + break } - } - typeURL := req.GetTypeUrl() - // Use the multiplexed channel for new watches. - responder := sw.watches.responders[resource.AnyType].response - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - // Only process if we have an existing watch otherwise go ahead and create. - if err := processAllExcept(typeURL); err != nil { - return err - } + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + // Only process if we have an existing watch otherwise go ahead and create. + if err := processAllExcept(typeURL); err != nil { + return err } + + subscription = w.sub } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSubscription(len(req.ResourceNames) == 0, nil) } + + subscription.SetResourceSubscription(req.GetResourceNames()) + + cancel, err := s.cache.CreateWatch(req, subscription, respChan) + if err != nil { + return err + } + + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: respChan, + sub: subscription, + }) } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index f5be0c57a9..173c3f9a8b 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,12 +18,14 @@ package sotw import ( "context" "errors" + "fmt" "strconv" "sync/atomic" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -65,6 +67,13 @@ func WithOrderedADS() config.XDSOption { } } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -88,43 +97,47 @@ type streamWrapper struct { callbacks Callbacks // callbacks for performing actions through stream lifecycle node *core.Node // registered xDS client - - // The below fields are used for tracking resource - // cache state and should be maintained per stream. - streamState stream.StreamState - lastDiscoveryResponses map[string]lastDiscoveryResponse } // Send packages the necessary resources before sending on the gRPC stream, // and sets the current state of the world. -func (s *streamWrapper) send(resp cache.Response) (string, error) { +func (s *streamWrapper) send(resp cache.Response) error { if resp == nil { - return "", errors.New("missing response") + return errors.New("missing response") } out, err := resp.GetDiscoveryResponse() if err != nil { - return "", err + return err } // increment nonce and convert it to base10 out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.GetNonce(), - resources: make(map[string]struct{}), + version, err := resp.GetVersion() + if err != nil { + return err } + + w, ok := s.watches.responders[resp.GetRequest().GetTypeUrl()] + if !ok { + return fmt.Errorf("no current watch for %s", resp.GetRequest().GetTypeUrl()) + } + + w.nonce = out.Nonce + // ToDo(valerian-roche): properly return the resources actually sent to the client + resources := make(map[string]string, len(resp.GetRequest().GetResourceNames())) for _, r := range resp.GetRequest().GetResourceNames() { - lastResponse.resources[r] = struct{}{} + resources[r] = version } - s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse + w.sub.SetReturnedResources(resources) // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - return out.GetNonce(), s.stream.Send(out) + return s.stream.Send(out) } // Shutdown closes all open watches, and notifies API consumers the stream has closed. @@ -135,15 +148,6 @@ func (s *streamWrapper) shutdown() { } } -// Discovery response that is sent over GRPC stream. -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // StreamHandler converts a blocking read call to channels and initiates stream processing func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { // a channel for receiving incoming requests diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index d781f663e6..43b3bedabe 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -7,6 +7,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // watches for all xDS resource types @@ -63,8 +64,11 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery // watch contains the necessary modifiable data for receiving resource responses type watch struct { cancel func() - nonce string response chan cache.Response + + sub stream.Subscription + // Nonce of the latest response sent for this type + nonce string } // close cancels an open watch diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 3b24dec409..cea848601d 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -26,9 +26,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque node: &core.Node{}, // node may only be set on the first discovery request // a collection of stack allocated watches per request type. - watches: newWatches(), - streamState: stream.NewStreamState(false, map[string]string{}), - lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), + watches: newWatches(), } // cleanup once our stream has ended. @@ -40,6 +38,22 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType && s.opts.Ordered { + // When using ADS we need to order responses. + // This is guaranteed in the xDS protocol specification + // as ADS is required to be eventually consistent. + // More details can be found here if interested: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + + // Trigger a different code path specifically for ADS. + // We want resource ordering so things don't get sent before they should. + // This is a blocking call and will exit the process function + // on successful completion. + s.opts.Logger.Debugf("[sotw] Switching to ordered ADS implementation for stream %d", sw.ID) + return s.processADS(&sw, reqCh, defaultTypeURL) + } + // do an initial recompute so we can load the first 2 channels: // <-reqCh // s.ctx.Done() @@ -76,36 +90,11 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque req.Node = sw.node } - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - - // When using ADS we need to order responses. - // This is guaranteed in the xDS protocol specification - // as ADS is required to be eventually consistent. - // More details can be found here if interested: - // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations - if s.opts.Ordered { - // send our first request on the stream again so it doesn't get - // lost in processing on the new control loop - // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here. - // If envoy is using ADS for endpoints, and clusters are added in short sequence, - // the following request might include a new cluster and be discarded as the previous one will be handled after. - go func() { - reqCh <- req - }() - - // Trigger a different code path specifically for ADS. - // We want resource ordering so things don't get sent before they should. - // This is a blocking call and will exit the process function - // on successful completion. - return s.processADS(&sw, reqCh, defaultTypeURL) - } } else if req.GetTypeUrl() == "" { req.TypeUrl = defaultTypeURL } @@ -116,34 +105,43 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + typeURL := req.GetTypeUrl() + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol defintion + s.opts.Logger.Debugf("[sotw ads] Skipping request as nonce is stale for %s", typeURL) + break } + + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() + + subscription = w.sub + } else { + s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSubscription(len(req.ResourceNames) == 0, nil) } - typeURL := req.GetTypeUrl() + subscription.SetResourceSubscription(req.GetResourceNames()) + responder := make(chan cache.Response, 1) - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err } + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: responder, + sub: subscription, + }) // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) @@ -155,12 +153,10 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } res := value.Interface().(cache.Response) - nonce, err := sw.send(res) + err := sw.send(res) if err != nil { return err } - - sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce } } } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 1664a941e0..b999d40c78 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -21,119 +21,3 @@ type DeltaStream interface { Send(*discovery.DeltaDiscoveryResponse) error Recv() (*discovery.DeltaDiscoveryRequest, error) } - -// StreamState will keep track of resource cache state per type on a stream. -type StreamState struct { // nolint:golint,revive - // Indicates whether the delta stream currently has a wildcard watch - wildcard bool - - // Provides the list of resources explicitly requested by the client - // This list might be non-empty even when set as wildcard - subscribedResourceNames map[string]struct{} - - // ResourceVersions contains a hash of the resource as the value and the resource name as the key. - // This field stores the last state sent to the client. - resourceVersions map[string]string - - // knownResourceNames contains resource names that a client has received previously (SOTW). - knownResourceNames map[string]map[string]struct{} - - // First indicates whether the StreamState has been modified since its creation - first bool - - // Ordered indicates whether we want an ordered ADS stream or not - ordered bool -} - -// NewStreamState initializes a stream state. -func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { - state := StreamState{ - wildcard: wildcard, - subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, - ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS - } - - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - - return state -} - -// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to -// If the request is set to wildcard it may be empty -// Currently populated only when using delta-xds -func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { - return s.subscribedResourceNames -} - -// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to -// It is decorrelated from the wildcard state of the stream -// Currently used only when using delta-xds -func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { - s.subscribedResourceNames = subscribedResourceNames -} - -// WatchesResources returns whether at least one of the resource provided is currently watch by the stream -// It is currently only applicable to delta-xds -// If the request is wildcard, it will always return true -// Otherwise it will compare the provided resources to the list of resources currently subscribed -func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { - if s.IsWildcard() { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - -func (s *StreamState) SetWildcard(wildcard bool) { - s.wildcard = wildcard -} - -// GetResourceVersions returns a map of current resources grouped by type URL. -func (s *StreamState) GetResourceVersions() map[string]string { - return s.resourceVersions -} - -// SetResourceVersions sets a list of resource versions by type URL and removes the flag -// of "first" since we can safely assume another request has come through the stream. -func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { - s.first = false - s.resourceVersions = resourceVersions -} - -// IsFirst returns whether or not the state of the stream is based upon the initial request. -func (s *StreamState) IsFirst() bool { - return s.first -} - -// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request. -func (s *StreamState) IsWildcard() bool { - return s.wildcard -} - -// GetKnownResourceNames returns the current known list of resources on a SOTW stream. -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - -// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. -func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { - s.knownResourceNames[url] = names -} - -// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input. -func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { - m := map[string]struct{}{} - for _, name := range names { - m[name] = struct{}{} - } - s.knownResourceNames[url] = m -} diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go new file mode 100644 index 0000000000..3a308d8a51 --- /dev/null +++ b/pkg/server/stream/v3/subscription.go @@ -0,0 +1,164 @@ +package stream + +const ( + explicitWildcard = "*" +) + +// Subscription stores the server view of a given type subscription in a stream. +type Subscription struct { + // wildcard indicates if the subscription currently has a wildcard watch. + wildcard bool + + // allowLegacyWildcard indicates that the stream never provided any resource + // and is de facto wildcard. + // As soon as a resource or an explicit subscription to wildcard is provided, + // this flag will be set to false + allowLegacyWildcard bool + + // subscribedResourceNames provides the resources explicitly requested by the client + // This list might be non-empty even when set as wildcard. + subscribedResourceNames map[string]struct{} + + // returnedResources contains the resources acknowledged by the client and the acknowledged versions. + returnedResources map[string]string +} + +// NewSubscription initializes a subscription state. +func NewSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription { + state := Subscription{ + wildcard: wildcard, + allowLegacyWildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + returnedResources: initialResourceVersions, + } + + if initialResourceVersions == nil { + state.returnedResources = make(map[string]string) + } + + return state +} + +// SetResourceSubscription updates the subscribed resources (including the wildcard state) +// based on the full state of subscribed resources provided in the request +// Used in sotw subscriptions +// Behavior is based on +// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return +func (s *Subscription) SetResourceSubscription(subscribed []string) { + if s.allowLegacyWildcard { + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } + + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + s.allowLegacyWildcard = false + } + + subscribedResources := make(map[string]struct{}, len(subscribed)) + explicitWildcardSet := false + for _, resource := range subscribed { + if resource == explicitWildcard { + explicitWildcardSet = true + } else { + subscribedResources[resource] = struct{}{} + } + } + + // Explicit subscription to wildcard as we are not in legacy wildcard behavior + s.wildcard = explicitWildcardSet + s.subscribedResourceNames = subscribedResources +} + +// UpdateResourceSubscriptions updates the subscribed resources (including the wildcard state) +// based on newly subscribed or unsubscribed resources +// Used in delta subscriptions +func (s *Subscription) UpdateResourceSubscriptions(subscribed []string, unsubscribed []string) { + // Handles legacy wildcard behavior first to exit if we are still in this behavior + if s.allowLegacyWildcard { + // The protocol (as of v1.29.0) only references subscribed as triggering + // exiting legacy wildcard behavior, so we currently not check unsubscribed + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } + + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + // The watch does remain wildcard if not explicitly unsubscribed (from the example in + // https://www.envoyproxy.io/docs/envoy/v1.29.0/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return) + s.allowLegacyWildcard = false + } + + // Handle subscriptions first + for _, resource := range subscribed { + if resource == explicitWildcard { + s.wildcard = true + continue + } + s.subscribedResourceNames[resource] = struct{}{} + } + + // Then unsubscriptions + for _, resource := range unsubscribed { + if resource == explicitWildcard { + s.wildcard = false + continue + } + if _, ok := s.subscribedResourceNames[resource]; ok && s.wildcard { + // The XDS protocol states that: + // * if a watch is currently wildcard + // * a resource is explicitly unsubscribed by name + // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) + // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated + // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: + // * detect the version change, and return the resource (as an update) + // * detect the resource deletion, and set it as removed in the response + s.returnedResources[resource] = "" + } + delete(s.subscribedResourceNames, resource) + } +} + +// SubscribedResources returns the list of resources currently explicitly subscribed to +// If the request is set to wildcard it may be empty +func (s Subscription) SubscribedResources() map[string]struct{} { + return s.subscribedResourceNames +} + +// IsWildcard returns whether or not the subscription currently has a wildcard watch +func (s Subscription) IsWildcard() bool { + return s.wildcard +} + +// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. +// If the request is wildcard, it will always return true, +// otherwise it will compare the provided resources to the list of resources currently subscribed +func (s Subscription) WatchesResources(resourceNames map[string]struct{}) bool { + if s.wildcard { + return true + } + for resourceName := range resourceNames { + if _, ok := s.subscribedResourceNames[resourceName]; ok { + return true + } + } + return false +} + +// ReturnedResources returns the list of resources returned to the client +// and their version +func (s Subscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +// SetReturnedResources sets a list of resource versions currently known by the client +// The cache can use this state to compute resources added/updated/deleted +func (s *Subscription) SetReturnedResources(resourceVersions map[string]string) { + s.returnedResources = resourceVersions +} diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 72c2b74075..7215287143 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -16,12 +16,11 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.Subscription, out chan cache.DeltaResponse) (func(), error) { config.deltaCounts[req.GetTypeUrl()] = config.deltaCounts[req.GetTypeUrl()] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -38,7 +37,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -47,24 +46,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetResourceVersions() { + for name := range state.ReturnedResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.SubscribedResources() { + prevVersion, found := state.ReturnedResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { @@ -89,10 +88,10 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR config.deltaWatches++ return func() { config.deltaWatches-- - } + }, nil } - return nil + return nil, nil } type mockDeltaStream struct { diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index d3c4e0f81b..ed0f530ff6 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -34,7 +34,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -50,18 +49,19 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() { - config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1 - if len(config.responses[req.GetTypeUrl()]) > 0 { - out <- config.responses[req.GetTypeUrl()][0] - config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:] +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.Subscription, out chan cache.Response) (func(), error) { + typ := req.GetTypeUrl() + config.counts[typ] = config.counts[typ] + 1 + if len(config.responses[typ]) > 0 { + out <- config.responses[typ][0] + config.responses[typ] = config.responses[typ][1:] } else { config.watches++ return func() { config.watches-- - } + }, nil } - return nil + return nil, nil } func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) {