From db72c1810d4b9f16310f4d88366fb69049ff6381 Mon Sep 17 00:00:00 2001
From: Valerian Roche <valerian.roche@datadoghq.com>
Date: Tue, 6 Jun 2023 23:25:29 -0400
Subject: [PATCH] Rename ClientState to SubscriptionState. Ensure
 WatchesResources is also compatible with sotw

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
---
 pkg/cache/v3/cache.go                |  19 ++-
 pkg/cache/v3/delta.go                |   2 +-
 pkg/cache/v3/delta_test.go           |  39 +++--
 pkg/cache/v3/linear.go               |  25 +--
 pkg/cache/v3/linear_test.go          | 225 ++++++++++++++++-----------
 pkg/cache/v3/mux.go                  |   9 +-
 pkg/cache/v3/simple.go               |  24 +--
 pkg/cache/v3/simple_test.go          |  68 ++++----
 pkg/cache/v3/status.go               |  18 +--
 pkg/server/delta/v3/server.go        |  14 +-
 pkg/server/delta/v3/watches.go       |   2 +-
 pkg/server/sotw/v3/ads.go            |  27 +++-
 pkg/server/sotw/v3/server.go         |   2 +-
 pkg/server/sotw/v3/xds.go            |  51 +++++-
 pkg/server/stream/v3/stream.go       |  66 --------
 pkg/server/stream/v3/subscription.go |  81 ++++++++++
 pkg/server/v3/delta_test.go          |   6 +-
 pkg/server/v3/server_test.go         |   6 +-
 18 files changed, 413 insertions(+), 271 deletions(-)
 create mode 100644 pkg/server/stream/v3/subscription.go

diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go
index 6174073802..b26e0f5bc8 100644
--- a/pkg/cache/v3/cache.go
+++ b/pkg/cache/v3/cache.go
@@ -36,12 +36,12 @@ type Request = discovery.DiscoveryRequest
 // DeltaRequest is an alias for the delta discovery request type.
 type DeltaRequest = discovery.DeltaDiscoveryRequest
 
-// ClientState provides additional data on the client knowledge for the type matching the request
+// SubscriptionState provides additional data on the client knowledge for the type matching the request
 // This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
 // Though the methods may return mutable parts of the state for performance reasons,
 // the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
-type ClientState interface {
-	// GetKnownResources returns the list of resources the clients has ACKed and their associated version.
+type SubscriptionState interface {
+	// GetKnownResources returns a list of resources that the client has ACK'd and their associated version.
 	// The versions are:
 	//  - delta protocol: version of the specific resource set in the response
 	//  - sotw protocol: version of the global response when the resource was last ACKed
@@ -53,8 +53,15 @@ type ClientState interface {
 	GetSubscribedResources() map[string]struct{}
 
 	// IsWildcard returns whether the client has a wildcard watch.
-	// This considers subtilities related to the current migration of wildcard definition within the protocol.
+	// This considers subtleties related to the current migration of wildcard definitions within the protocol.
+	// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
 	IsWildcard() bool
+
+	// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
+	// It is currently only applicable to delta-xds.
+	// If the request is wildcard, it will always return true,
+	// otherwise it will compare the provided resources to the list of resources currently subscribed
+	WatchesResources(resourceNames map[string]struct{}) bool
 }
 
 // ConfigWatcher requests watches for configuration resources by a node, last
@@ -74,7 +81,7 @@ type ConfigWatcher interface {
 	//
 	// Cancel is an optional function to release resources in the producer. If
 	// provided, the consumer may call this function multiple times.
-	CreateWatch(*Request, ClientState, chan Response) (cancel func())
+	CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error)
 
 	// CreateDeltaWatch returns a new open incremental xDS watch.
 	// This is the entrypoint to propagate configuration changes the
@@ -86,7 +93,7 @@ type ConfigWatcher interface {
 	//
 	// Cancel is an optional function to release resources in the producer. If
 	// provided, the consumer may call this function multiple times.
-	CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func())
+	CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error)
 }
 
 // ConfigFetcher fetches configuration resources from cache
diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go
index c5adf9fb8b..9dc3f87127 100644
--- a/pkg/cache/v3/delta.go
+++ b/pkg/cache/v3/delta.go
@@ -27,7 +27,7 @@ type resourceContainer struct {
 	systemVersion string
 }
 
-func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse {
+func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse {
 	// variables to build our response with
 	var nextVersionMap map[string]string
 	var filtered []types.Resource
diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go
index d423ce0465..25dddd6b29 100644
--- a/pkg/cache/v3/delta_test.go
+++ b/pkg/cache/v3/delta_test.go
@@ -35,14 +35,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
 	// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.DeltaResponse, 1)
-		state := stream.NewStreamState(true, nil)
-		c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+		state := stream.NewSubscriptionState(true, nil)
+		_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 			Node: &core.Node{
 				Id: "node",
 			},
 			TypeUrl:                typ,
 			ResourceNamesSubscribe: names[typ],
 		}, state, watches[typ])
+		require.NoError(t, err)
 	}
 
 	if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
@@ -68,17 +69,18 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
 	// all resources as well as individual resource removals
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.DeltaResponse, 1)
-		state := stream.NewStreamState(false, versionMap[typ])
+		state := stream.NewSubscriptionState(false, versionMap[typ])
 		for resource := range versionMap[typ] {
 			state.GetSubscribedResources()[resource] = struct{}{}
 		}
-		c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+		_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 			Node: &core.Node{
 				Id: "node",
 			},
 			TypeUrl:                typ,
 			ResourceNamesSubscribe: names[typ],
 		}, state, watches[typ])
+		require.NoError(t, err)
 	}
 
 	if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
@@ -111,21 +113,22 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
 func TestDeltaRemoveResources(t *testing.T) {
 	c := cache.NewSnapshotCache(false, group{}, logger{t: t})
 	watches := make(map[string]chan cache.DeltaResponse)
-	streams := make(map[string]*stream.StreamState)
+	streams := make(map[string]*stream.SubscriptionState)
 
 	// At this stage the cache is empty, so a watch is opened
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.DeltaResponse, 1)
-		state := stream.NewStreamState(true, make(map[string]string))
+		state := stream.NewSubscriptionState(true, make(map[string]string))
 		streams[typ] = &state
 		// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
 		// functionality. This means we should receive all resources back without requesting a subscription by name.
-		c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+		_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 			Node: &core.Node{
 				Id: "node",
 			},
 			TypeUrl: typ,
 		}, *streams[typ], watches[typ])
+		require.NoError(t, err)
 	}
 
 	snapshot := fixture.snapshot()
@@ -141,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
 			case out := <-watches[typ]:
 				assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
 				nextVersionMap := out.GetNextVersionMap()
-				streams[typ].SetResourceVersions(nextVersionMap)
+				streams[typ].SetKnownResources(nextVersionMap)
 			case <-time.After(time.Second):
 				require.Fail(t, "failed to receive a snapshot response")
 			}
@@ -152,13 +155,14 @@ func TestDeltaRemoveResources(t *testing.T) {
 	// test the removal of certain resources from a partial snapshot
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.DeltaResponse, 1)
-		c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+		_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 			Node: &core.Node{
 				Id: "node",
 			},
 			TypeUrl:       typ,
 			ResponseNonce: "nonce",
 		}, *streams[typ], watches[typ])
+		require.NoError(t, err)
 	}
 
 	assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")
@@ -202,14 +206,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
 						t.Fatalf("snapshot failed: %s", err)
 					}
 				} else {
-					state := stream.NewStreamState(false, make(map[string]string))
-					cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+					state := stream.NewSubscriptionState(false, make(map[string]string))
+					cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 						Node: &core.Node{
 							Id: id,
 						},
 						TypeUrl:                rsrc.EndpointType,
 						ResourceNamesSubscribe: []string{clusterName},
 					}, state, responses)
+					require.NoError(t, err)
 
 					defer cancel()
 				}
@@ -225,21 +230,22 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
 
 	// Create a non-buffered channel that will block sends.
 	watchCh := make(chan cache.DeltaResponse)
-	state := stream.NewStreamState(false, nil)
+	state := stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
-	c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+	_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 		Node: &core.Node{
 			Id: key,
 		},
 		TypeUrl:                rsrc.EndpointType,
 		ResourceNamesSubscribe: names[rsrc.EndpointType],
 	}, state, watchCh)
+	require.NoError(t, err)
 
 	// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
 	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
 	defer cancel()
 
-	err := c.SetSnapshot(ctx, key, fixture.snapshot())
+	err = c.SetSnapshot(ctx, key, fixture.snapshot())
 	assert.EqualError(t, err, context.Canceled.Error())
 
 	// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
@@ -269,14 +275,15 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
 	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
 	for _, typ := range testTypes {
 		responses := make(chan cache.DeltaResponse, 1)
-		state := stream.NewStreamState(false, make(map[string]string))
-		cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
+		state := stream.NewSubscriptionState(false, make(map[string]string))
+		cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
 			Node: &core.Node{
 				Id: key,
 			},
 			TypeUrl:                typ,
 			ResourceNamesSubscribe: names[typ],
 		}, state, responses)
+		require.NoError(t, err)
 
 		// Cancel the watch
 		cancel()
diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go
index 33385b2959..f1c8fcac28 100644
--- a/pkg/cache/v3/linear.go
+++ b/pkg/cache/v3/linear.go
@@ -17,6 +17,7 @@ package cache
 import (
 	"context"
 	"errors"
+	"fmt"
 	"strconv"
 	"strings"
 	"sync"
@@ -163,11 +164,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
 		}
 
 		for id, watch := range cache.deltaWatches {
-			if !watch.WatchesResources(modified) {
+			if !watch.subscriptionState.WatchesResources(modified) {
 				continue
 			}
 
-			res := cache.respondDelta(watch.Request, watch.Response, watch.clientState)
+			res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState)
 			if res != nil {
 				delete(cache.deltaWatches, id)
 			}
@@ -175,7 +176,7 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
 	}
 }
 
-func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse {
+func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse {
 	resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
 		resourceMap:   cache.resources,
 		versionMap:    cache.versionMap,
@@ -297,10 +298,10 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
 	return resources
 }
 
-func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() {
+func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) {
 	if request.TypeUrl != cache.typeURL {
 		value <- nil
-		return nil
+		return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL)
 	}
 	// If the version is not up to date, check whether any requested resource has
 	// been updated between the last version and the current version. This avoids the problem
@@ -336,7 +337,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
 	}
 	if stale {
 		cache.respond(value, staleResources)
-		return nil
+		return nil, nil
 	}
 	// Create open watches since versions are up to date.
 	if len(request.ResourceNames) == 0 {
@@ -345,7 +346,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
 			cache.mu.Lock()
 			defer cache.mu.Unlock()
 			delete(cache.watchAll, value)
-		}
+		}, nil
 	}
 	for _, name := range request.ResourceNames {
 		set, exists := cache.watches[name]
@@ -367,10 +368,10 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
 				delete(cache.watches, name)
 			}
 		}
-	}
+	}, nil
 }
 
-func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() {
+func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
 	cache.mu.Lock()
 	defer cache.mu.Unlock()
 
@@ -398,12 +399,12 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Cl
 				cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
 		}
 
-		cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState}
+		cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}
 
-		return cache.cancelDeltaWatch(watchID)
+		return cache.cancelDeltaWatch(watchID), nil
 	}
 
-	return nil
+	return nil, nil
 }
 
 func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go
index b5f9a0183a..51a67882fb 100644
--- a/pkg/cache/v3/linear_test.go
+++ b/pkg/cache/v3/linear_test.go
@@ -189,57 +189,62 @@ func hashResource(t *testing.T, resource types.Resource) string {
 	return v
 }
 
-func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) {
-	state := stream.NewStreamState(true, nil)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error {
+	state := stream.NewSubscriptionState(true, nil)
+	if _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w); err != nil {
+		return err
+	}
 	resp := <-w
-	state.SetResourceVersions(resp.GetNextVersionMap())
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values
+	state.SetKnownResources(resp.GetNextVersionMap())
+	_, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values
+	return err
 }
 
 func TestLinearInitialResources(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "0", 1)
-	c.CreateWatch(&Request{TypeUrl: testType}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "0", 2)
 	checkVersionMapNotSet(t, c)
 }
 
 func TestLinearCornerCases(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType)
 	err := c.UpdateResource("a", nil)
-	if err == nil {
-		t.Error("expected error on nil resource")
-	}
+	assert.Error(t, err, "expected error on nil resource")
+
 	// create an incorrect type URL request
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w)
+	assert.Error(t, err, "watch should fail to be created")
 	select {
 	case r := <-w:
-		if r != nil {
-			t.Error("response should be nil")
-		}
+		assert.Nil(t, r, "response should be nil")
 	default:
-		t.Error("should receive nil response")
+		assert.Fail(t, "should receive nil response")
 	}
 }
 
 func TestLinearBasic(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType)
 
 	// Create watches before a resource is ready
 	w1 := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	require.NoError(t, err)
 	mustBlock(t, w1)
 	checkVersionMapNotSet(t, c)
 
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkWatchCount(t, c, "a", 2)
 	checkWatchCount(t, c, "b", 1)
@@ -250,34 +255,40 @@ func TestLinearBasic(t *testing.T) {
 	verifyResponse(t, w, "1", 1)
 
 	// Request again, should get same response
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	checkWatchCount(t, c, "a", 0)
 	verifyResponse(t, w, "1", 1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	checkWatchCount(t, c, "a", 0)
 	verifyResponse(t, w, "1", 1)
 
 	// Add another element and update the first, response should be different
 	require.NoError(t, c.UpdateResource("b", testResource("b")))
 	require.NoError(t, c.UpdateResource("a", testResource("aa")))
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "3", 1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "3", 2)
 	// Ensure the version map was not created as we only ever used stow watches
 	checkVersionMapNotSet(t, c)
 }
 
 func TestLinearSetResources(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType)
 
 	// Create new resources
 	w1 := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	require.NoError(t, err)
 	mustBlock(t, w1)
 	w2 := make(chan Response, 1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2)
+	require.NoError(t, err)
 	mustBlock(t, w2)
 	c.SetResources(map[string]types.Resource{
 		"a": testResource("a"),
@@ -287,9 +298,11 @@ func TestLinearSetResources(t *testing.T) {
 	verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources
 
 	// Add another element and update the first, response should be different
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1)
+	require.NoError(t, err)
 	mustBlock(t, w1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
+	require.NoError(t, err)
 	mustBlock(t, w2)
 	c.SetResources(map[string]types.Resource{
 		"a": testResource("aa"),
@@ -300,9 +313,11 @@ func TestLinearSetResources(t *testing.T) {
 	verifyResponse(t, w2, "2", 3)
 
 	// Delete resource
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1)
+	require.NoError(t, err)
 	mustBlock(t, w1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2)
+	require.NoError(t, err)
 	mustBlock(t, w2)
 	c.SetResources(map[string]types.Resource{
 		"b": testResource("b"),
@@ -330,49 +345,57 @@ func TestLinearGetResources(t *testing.T) {
 }
 
 func TestLinearVersionPrefix(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType, WithVersionPrefix("instance1-"))
 
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "instance1-0", 0)
 
 	require.NoError(t, c.UpdateResource("a", testResource("a")))
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "instance1-1", 1)
 
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkWatchCount(t, c, "a", 1)
 }
 
 func TestLinearDeletion(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkWatchCount(t, c, "a", 1)
 	require.NoError(t, c.DeleteResource("a"))
 	verifyResponse(t, w, "1", 0)
 	checkWatchCount(t, c, "a", 0)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "1", 1)
 	checkWatchCount(t, c, "b", 0)
 	require.NoError(t, c.DeleteResource("b"))
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	require.NoError(t, err)
 	verifyResponse(t, w, "2", 0)
 	checkWatchCount(t, c, "b", 0)
 }
 
 func TestLinearWatchTwo(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	_, err := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	w1 := make(chan Response, 1)
-	c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	_, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
+	require.NoError(t, err)
 	mustBlock(t, w1)
 	require.NoError(t, c.UpdateResource("a", testResource("aa")))
 	// should only get the modified resource
@@ -381,20 +404,22 @@ func TestLinearWatchTwo(t *testing.T) {
 }
 
 func TestLinearCancel(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType)
 	require.NoError(t, c.UpdateResource("a", testResource("a")))
 
 	// cancel watch-all
 	w := make(chan Response, 1)
-	cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	cancel, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkWatchCount(t, c, "a", 1)
 	cancel()
 	checkWatchCount(t, c, "a", 0)
 
 	// cancel watch for "a"
-	cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkWatchCount(t, c, "a", 1)
 	cancel()
@@ -404,10 +429,14 @@ func TestLinearCancel(t *testing.T) {
 	w2 := make(chan Response, 1)
 	w3 := make(chan Response, 1)
 	w4 := make(chan Response, 1)
-	cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
-	cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
-	cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3)
-	cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4)
+	cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
+	require.NoError(t, err)
+	cancel2, err := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
+	require.NoError(t, err)
+	cancel3, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3)
+	require.NoError(t, err)
+	cancel4, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	mustBlock(t, w2)
 	mustBlock(t, w3)
@@ -429,7 +458,7 @@ func TestLinearCancel(t *testing.T) {
 // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be
 // rewritten to not require that. This is not the case in the GH actions environment.
 func TestLinearConcurrentSetWatch(t *testing.T) {
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	c := NewLinearCache(testType)
 	n := 50
 	for i := 0; i < 2*n; i++ {
@@ -444,12 +473,13 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
 					id2 := fmt.Sprintf("%d", i-1)
 					t.Logf("request resources %q and %q", id, id2)
 					value := make(chan Response, 1)
-					c.CreateWatch(&Request{
+					_, err := c.CreateWatch(&Request{
 						// Only expect one to become stale
 						ResourceNames: []string{id, id2},
 						VersionInfo:   "0",
 						TypeUrl:       testType,
 					}, streamState, value)
+					require.NoError(t, err)
 					// wait until all updates apply
 					verifyResponse(t, value, "", 1)
 				}
@@ -460,19 +490,21 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
 
 func TestLinearDeltaWildcard(t *testing.T) {
 	c := NewLinearCache(testType)
-	state1 := stream.NewStreamState(true, map[string]string{})
+	state1 := stream.NewSubscriptionState(true, map[string]string{})
 	w1 := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1)
+	_, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1)
+	require.NoError(t, err)
 	mustBlockDelta(t, w1)
-	state2 := stream.NewStreamState(true, map[string]string{})
+	state2 := stream.NewSubscriptionState(true, map[string]string{})
 	w2 := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state2, w2)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state2, w2)
+	require.NoError(t, err)
 	mustBlockDelta(t, w1)
 	checkDeltaWatchCount(t, c, 2)
 
 	a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
 	hash := hashResource(t, a)
-	err := c.UpdateResource("a", a)
+	err = c.UpdateResource("a", a)
 	assert.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil)
@@ -490,17 +522,19 @@ func TestLinearDeltaExistingResources(t *testing.T) {
 	err = c.UpdateResource("b", b)
 	assert.NoError(t, err)
 
-	state := stream.NewStreamState(false, nil)
+	state := stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a
 	w := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{})
 
-	state = stream.NewStreamState(false, nil)
+	state = stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w = make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
 }
@@ -516,17 +550,19 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) {
 	err = c.UpdateResource("b", b)
 	assert.NoError(t, err)
 
-	state := stream.NewStreamState(false, map[string]string{"b": hashB})
+	state := stream.NewSubscriptionState(false, map[string]string{"b": hashB})
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned
 
-	state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
+	state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB})
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w = make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b
@@ -550,18 +586,20 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
 	// There is currently no delta watch
 	checkVersionMapNotSet(t, c)
 
-	state := stream.NewStreamState(false, nil)
+	state := stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
 	checkVersionMapSet(t, c)
 
-	state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
+	state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB})
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w = make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 
@@ -586,17 +624,19 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
 	err = c.UpdateResource("b", b)
 	assert.NoError(t, err)
 
-	state := stream.NewStreamState(false, nil)
+	state := stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w := make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	checkDeltaWatchCount(t, c, 0)
 	verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
 
-	state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
+	state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB})
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w = make(chan DeltaResponse, 1)
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 
@@ -611,14 +651,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
 func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	c := NewLinearCache(testType)
 
-	state := stream.NewStreamState(false, nil)
+	state := stream.NewSubscriptionState(false, nil)
 	state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	w := make(chan DeltaResponse, 1)
 	checkVersionMapNotSet(t, c)
 	assert.Equal(t, 0, c.NumResources())
 
 	// Initial update
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	// The version map should now be created, even if empty
@@ -627,16 +668,17 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	hashA := hashResource(t, a)
 	b := &endpoint.ClusterLoadAssignment{ClusterName: "b"}
 	hashB := hashResource(t, b)
-	err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
+	err = c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
 	assert.NoError(t, err)
 	resp := <-w
 	validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
 	checkVersionMapSet(t, c)
 	assert.Equal(t, 2, c.NumResources())
-	state.SetResourceVersions(resp.GetNextVersionMap())
+	state.SetKnownResources(resp.GetNextVersionMap())
 
 	// Multiple updates
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
@@ -653,10 +695,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
 	checkVersionMapSet(t, c)
 	assert.Equal(t, 2, c.NumResources())
-	state.SetResourceVersions(resp.GetNextVersionMap())
+	state.SetKnownResources(resp.GetNextVersionMap())
 
 	// Update/add/delete
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
@@ -672,10 +715,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
 	checkVersionMapSet(t, c)
 	assert.Equal(t, 2, c.NumResources())
-	state.SetResourceVersions(resp.GetNextVersionMap())
+	state.SetKnownResources(resp.GetNextVersionMap())
 
 	// Re-add previously deleted watched resource
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
+	require.NoError(t, err)
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource
@@ -688,10 +732,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
 	checkVersionMapSet(t, c)
 	assert.Equal(t, 2, c.NumResources())
-	state.SetResourceVersions(resp.GetNextVersionMap())
+	state.SetKnownResources(resp.GetNextVersionMap())
 
 	// Wildcard create/update
-	createWildcardDeltaWatch(c, w)
+	require.NoError(t, createWildcardDeltaWatch(c, w))
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
@@ -707,7 +751,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
 	assert.Equal(t, 3, c.NumResources())
 
 	// Wildcard update/delete
-	createWildcardDeltaWatch(c, w)
+	require.NoError(t, createWildcardDeltaWatch(c, w))
 	mustBlockDelta(t, w)
 	checkDeltaWatchCount(t, c, 1)
 	a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
@@ -736,9 +780,10 @@ func TestLinearMixedWatches(t *testing.T) {
 	assert.NoError(t, err)
 	assert.Equal(t, 2, c.NumResources())
 
-	sotwState := stream.NewStreamState(false, nil)
+	sotwState := stream.NewSubscriptionState(false, nil)
 	w := make(chan Response, 1)
-	c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkVersionMapNotSet(t, c)
 
@@ -752,16 +797,18 @@ func TestLinearMixedWatches(t *testing.T) {
 	verifyResponse(t, w, c.getVersion(), 1)
 	checkVersionMapNotSet(t, c)
 
-	c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w)
+	_, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w)
+	require.NoError(t, err)
 	mustBlock(t, w)
 	checkVersionMapNotSet(t, c)
 
-	deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
+	deltaState := stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB})
 	deltaState.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}})
 	wd := make(chan DeltaResponse, 1)
 
 	// Initial update
-	c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaState, wd)
+	_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaState, wd)
+	require.NoError(t, err)
 	mustBlockDelta(t, wd)
 	checkDeltaWatchCount(t, c, 1)
 	checkVersionMapSet(t, c)
diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go
index dd4c13a3e5..d4bb5791a9 100644
--- a/pkg/cache/v3/mux.go
+++ b/pkg/cache/v3/mux.go
@@ -17,6 +17,7 @@ package cache
 import (
 	"context"
 	"errors"
+	"fmt"
 )
 
 // MuxCache multiplexes across several caches using a classification function.
@@ -35,22 +36,22 @@ type MuxCache struct {
 
 var _ Cache = &MuxCache{}
 
-func (mux *MuxCache) CreateWatch(request *Request, state ClientState, value chan Response) func() {
+func (mux *MuxCache) CreateWatch(request *Request, state SubscriptionState, value chan Response) (func(), error) {
 	key := mux.Classify(request)
 	cache, exists := mux.Caches[key]
 	if !exists {
 		value <- nil
-		return nil
+		return nil, fmt.Errorf("no cache defined for key %s", key)
 	}
 	return cache.CreateWatch(request, state, value)
 }
 
-func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state ClientState, value chan DeltaResponse) func() {
+func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state SubscriptionState, value chan DeltaResponse) (func(), error) {
 	key := mux.ClassifyDelta(request)
 	cache, exists := mux.Caches[key]
 	if !exists {
 		value <- nil
-		return nil
+		return nil, fmt.Errorf("no cache defined for key %s", key)
 	}
 	return cache.CreateDeltaWatch(request, state, value)
 }
diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go
index 216b2e41a1..36a344ff28 100644
--- a/pkg/cache/v3/simple.go
+++ b/pkg/cache/v3/simple.go
@@ -286,7 +286,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
 				snapshot,
 				watch.Request,
 				watch.Response,
-				watch.clientState,
+				watch.subscriptionState,
 			)
 			if err != nil {
 				return err
@@ -344,7 +344,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
 
 // CreateWatch returns a watch for an xDS request.  A nil function may be
 // returned if an error occurs.
-func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() {
+func (cache *snapshotCache) CreateWatch(request *Request, clientState SubscriptionState, value chan Response) (func(), error) {
 	nodeID := cache.hash.ID(request.Node)
 
 	cache.mu.Lock()
@@ -386,9 +386,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientStat
 					if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
 						cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
 							request.ResourceNames, nodeID, err)
-						return nil
+						return nil, fmt.Errorf("failed to send the response: %w", err)
 					}
-					return func() {}
+					return func() {}, nil
 				}
 			}
 		}
@@ -401,7 +401,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientStat
 		info.mu.Lock()
 		info.watches[watchID] = ResponseWatch{Request: request, Response: value}
 		info.mu.Unlock()
-		return cache.cancelWatch(nodeID, watchID)
+		return cache.cancelWatch(nodeID, watchID), nil
 	}
 
 	// otherwise, the watch may be responded immediately
@@ -409,10 +409,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientStat
 	if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
 		cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
 			request.ResourceNames, nodeID, err)
-		return nil
+		return nil, fmt.Errorf("failed to send the response: %w", err)
 	}
 
-	return func() {}
+	return func() {}, nil
 }
 
 func (cache *snapshotCache) nextWatchID() int64 {
@@ -484,7 +484,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
 }
 
 // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
-func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() {
+func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
 	nodeID := cache.hash.ID(request.Node)
 	t := request.GetTypeUrl()
 
@@ -530,15 +530,15 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState
 			cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, clientState.GetSubscribedResources(), nodeID)
 		}
 
-		info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, clientState: clientState})
-		return cache.cancelDeltaWatch(nodeID, watchID)
+		info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState})
+		return cache.cancelDeltaWatch(nodeID, watchID), nil
 	}
 
-	return nil
+	return nil, nil
 }
 
 // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
-func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, clientState ClientState) (*RawDeltaResponse, error) {
+func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) (*RawDeltaResponse, error) {
 	resp := createDeltaResponse(ctx, request, clientState, resourceContainer{
 		resourceMap:   snapshot.GetResources(request.TypeUrl),
 		versionMap:    snapshot.GetVersionMap(request.TypeUrl),
diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go
index 0e11e4f093..3179df45d3 100644
--- a/pkg/cache/v3/simple_test.go
+++ b/pkg/cache/v3/simple_test.go
@@ -131,13 +131,14 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
 
 	wg := sync.WaitGroup{}
 	// All the resources should respond immediately when version is not up to date.
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	for _, typ := range testTypes {
 		wg.Add(1)
 		t.Run(typ, func(t *testing.T) {
 			defer wg.Done()
 			value := make(chan cache.Response, 1)
-			c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value)
+			_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value)
+			require.NoError(t, err)
 			select {
 			case out := <-value:
 				if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version {
@@ -168,8 +169,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
 			end := time.After(5 * time.Second)
 			for {
 				value := make(chan cache.Response, 1)
-				cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version},
+				cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version},
 					streamState, value)
+				require.NoError(t, err)
 
 				select {
 				case out := <-value:
@@ -230,9 +232,11 @@ func TestSnapshotCache(t *testing.T) {
 	// try to get endpoints with incorrect list of names
 	// should not receive response
 	value := make(chan cache.Response, 1)
-	streamState := stream.NewStreamState(false, map[string]string{})
-	c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}},
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
+	_, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}},
 		streamState, value)
+	require.NoError(t, err)
+
 	select {
 	case out := <-value:
 		t.Errorf("watch for endpoints and mismatched names => got %v, want none", out)
@@ -242,9 +246,10 @@ func TestSnapshotCache(t *testing.T) {
 	for _, typ := range testTypes {
 		t.Run(typ, func(t *testing.T) {
 			value := make(chan cache.Response, 1)
-			streamState := stream.NewStreamState(false, map[string]string{})
-			c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]},
+			streamState := stream.NewSubscriptionState(false, map[string]string{})
+			_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]},
 				streamState, value)
+			require.NoError(t, err)
 			select {
 			case out := <-value:
 				snapshot := fixture.snapshot()
@@ -295,10 +300,11 @@ func TestSnapshotCacheFetch(t *testing.T) {
 func TestSnapshotCacheWatch(t *testing.T) {
 	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
 	watches := make(map[string]chan cache.Response)
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.Response, 1)
-		c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ])
+		_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ])
+		require.NoError(t, err)
 	}
 	if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
 		t.Fatal(err)
@@ -326,8 +332,9 @@ func TestSnapshotCacheWatch(t *testing.T) {
 	// open new watches with the latest version
 	for _, typ := range testTypes {
 		watches[typ] = make(chan cache.Response, 1)
-		c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version},
+		_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version},
 			streamState, watches[typ])
+		require.NoError(t, err)
 	}
 	if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) {
 		t.Errorf("watches should be created for the latest version: %d", count)
@@ -372,11 +379,12 @@ func TestConcurrentSetWatch(t *testing.T) {
 					t.Fatalf("failed to set snapshot %q: %s", id, err)
 				}
 			} else {
-				streamState := stream.NewStreamState(false, map[string]string{})
-				cancel := c.CreateWatch(&discovery.DiscoveryRequest{
+				streamState := stream.NewSubscriptionState(false, map[string]string{})
+				cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{
 					Node:    &core.Node{Id: id},
 					TypeUrl: rsrc.EndpointType,
 				}, streamState, value)
+				require.NoError(t, err)
 
 				defer cancel()
 			}
@@ -386,10 +394,11 @@ func TestConcurrentSetWatch(t *testing.T) {
 
 func TestSnapshotCacheWatchCancel(t *testing.T) {
 	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
-	streamState := stream.NewStreamState(false, map[string]string{})
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
 	for _, typ := range testTypes {
 		value := make(chan cache.Response, 1)
-		cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value)
+		cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value)
+		require.NoError(t, err)
 		cancel()
 	}
 	// should be status info for the node
@@ -413,15 +422,16 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) {
 
 	// Create a non-buffered channel that will block sends.
 	watchCh := make(chan cache.Response)
-	streamState := stream.NewStreamState(false, map[string]string{})
-	c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]},
+	streamState := stream.NewSubscriptionState(false, map[string]string{})
+	_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]},
 		streamState, watchCh)
+	require.NoError(t, err)
 
 	// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
 	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
 	defer cancel()
 
-	err := c.SetSnapshot(ctx, key, fixture.snapshot())
+	err = c.SetSnapshot(ctx, key, fixture.snapshot())
 	assert.EqualError(t, err, context.Canceled.Error())
 
 	// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
@@ -469,9 +479,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {
 
 	// Request resource with name=ClusterName
 	go func() {
-		state := stream.NewStreamState(false, map[string]string{})
-		c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}},
+		state := stream.NewSubscriptionState(false, map[string]string{})
+		_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}},
 			&state, watch)
+		require.NoError(t, err)
 	}()
 
 	select {
@@ -489,10 +500,11 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {
 
 	// Request additional resource with name=clusterName2 for same version
 	go func() {
-		state := stream.NewStreamState(false, map[string]string{})
-		state.SetResourceVersions(map[string]string{clusterName: fixture.version})
-		c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
+		state := stream.NewSubscriptionState(false, map[string]string{})
+		state.SetKnownResources(map[string]string{clusterName: fixture.version})
+		_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
 			ResourceNames: []string{clusterName, clusterName2}}, &state, watch)
+		require.NoError(t, err)
 	}()
 
 	select {
@@ -508,12 +520,13 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {
 	}
 
 	// Repeat request for with same version and make sure a watch is created
-	state := stream.NewStreamState(false, map[string]string{})
-	state.SetResourceVersions(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
-	if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
+	state := stream.NewSubscriptionState(false, map[string]string{})
+	state.SetKnownResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
+	if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
 		ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil {
 		t.Fatal("Should create a watch")
 	} else {
+		require.NoError(t, err)
 		cancel()
 	}
 }
@@ -640,9 +653,10 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) {
 		ResourceNames: []string{"rtds"},
 		TypeUrl:       rsrc.RuntimeType,
 	}
-	ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"})
+	ss := stream.NewSubscriptionState(false, map[string]string{"cluster": "abcdef"})
 	responder := make(chan cache.Response)
-	c.CreateWatch(req, &ss, responder)
+	_, err := c.CreateWatch(req, &ss, responder)
+	require.NoError(t, err)
 
 	go func() {
 		// Wait for at least one heartbeat to occur, then set snapshot.
diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go
index dd867f88c7..0495261e52 100644
--- a/pkg/cache/v3/status.go
+++ b/pkg/cache/v3/status.go
@@ -100,23 +100,7 @@ type DeltaResponseWatch struct {
 	Response chan DeltaResponse
 
 	// VersionMap for the stream
-	clientState ClientState
-}
-
-// WatchesResources returns whether at least one of the resources provided is currently being watched by the stream.
-// It is currently only applicable to delta-xds.
-// If the request is wildcard, it will always return true,
-// otherwise it will compare the provided resources to the list of resources currently subscribed
-func (w *DeltaResponseWatch) WatchesResources(resourceNames map[string]struct{}) bool {
-	if w.clientState.IsWildcard() {
-		return true
-	}
-	for resourceName := range resourceNames {
-		if _, ok := w.clientState.GetSubscribedResources()[resourceName]; ok {
-			return true
-		}
-	}
-	return false
+	subscriptionState SubscriptionState
 }
 
 // newStatusInfo initializes a status info data structure.
diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go
index ebadd74849..af14293f51 100644
--- a/pkg/server/delta/v3/server.go
+++ b/pkg/server/delta/v3/server.go
@@ -131,7 +131,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
 			watch := watches.deltaWatches[typ]
 			watch.nonce = nonce
 
-			watch.state.SetResourceVersions(resp.GetNextVersionMap())
+			watch.state.SetKnownResources(resp.GetNextVersionMap())
 			watches.deltaWatches[typ] = watch
 		case req, more := <-reqCh:
 			// input stream ended or errored out
@@ -176,7 +176,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
 				// We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
 				// If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
 				// It can still be done by explicitly unsubscribing from "*"
-				watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
+				watch.state = stream.NewSubscriptionState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
 			} else {
 				watch.Cancel()
 			}
@@ -185,7 +185,11 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
 			s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)
 
 			watch.responses = make(chan cache.DeltaResponse, 1)
-			watch.cancel = s.cache.CreateDeltaWatch(req, &watch.state, watch.responses)
+			var err error
+			watch.cancel, err = s.cache.CreateDeltaWatch(req, &watch.state, watch.responses)
+			if err != nil {
+				return err
+			}
 			watches.deltaWatches[typeURL] = watch
 
 			go func() {
@@ -226,7 +230,7 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro
 
 // When we subscribe, we just want to make the cache know we are subscribing to a resource.
 // Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on.
-func (s *server) subscribe(resources []string, streamState *stream.StreamState) {
+func (s *server) subscribe(resources []string, streamState *stream.SubscriptionState) {
 	sv := streamState.GetSubscribedResources()
 	for _, resource := range resources {
 		if resource == "*" {
@@ -239,7 +243,7 @@ func (s *server) subscribe(resources []string, streamState *stream.StreamState)
 
 // Unsubscriptions remove resources from the stream's subscribed resource list.
 // If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources.
-func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) {
+func (s *server) unsubscribe(resources []string, streamState *stream.SubscriptionState) {
 	sv := streamState.GetSubscribedResources()
 	for _, resource := range resources {
 		if resource == "*" {
diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go
index c88548388a..1712ed37dd 100644
--- a/pkg/server/delta/v3/watches.go
+++ b/pkg/server/delta/v3/watches.go
@@ -36,7 +36,7 @@ type watch struct {
 	cancel    func()
 	nonce     string
 
-	state stream.StreamState
+	state stream.SubscriptionState
 }
 
 // Cancel calls terminate and cancel
diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go
index 417d9c3689..ae2ac7f40e 100644
--- a/pkg/server/sotw/v3/ads.go
+++ b/pkg/server/sotw/v3/ads.go
@@ -8,6 +8,7 @@ import (
 	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
 	"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
 	"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
+	"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
 )
 
 // process handles a bi-di stream request
@@ -101,15 +102,23 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
 				}
 			}
 
-			streamState := sw.streamStates[req.TypeUrl]
+			streamState, ok := sw.streamStates[req.TypeUrl]
+			if !ok {
+				// Supports legacy wildcard mode
+				// Wildcard will be set to true if no resource is set
+				streamState = stream.NewSubscriptionState(len(req.ResourceNames) == 0, nil)
+			}
 
+			// ToDo: track ACK through subscription state
 			if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok {
 				if lastResponse.nonce == "" || lastResponse.nonce == nonce {
 					// Let's record Resource names that a client has received.
-					streamState.SetResourceVersions(lastResponse.resources)
+					streamState.SetKnownResources(lastResponse.resources)
 				}
 			}
 
+			updateSubscriptionResources(req, &streamState)
+
 			typeURL := req.GetTypeUrl()
 			// Use the multiplexed channel for new watches.
 			responder := sw.watches.responders[resource.AnyType].response
@@ -124,16 +133,26 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
 						return err
 					}
 
+					cancel, err := s.cache.CreateWatch(req, streamState, responder)
+					if err != nil {
+						return err
+					}
+
 					sw.watches.addWatch(typeURL, &watch{
-						cancel:   s.cache.CreateWatch(req, streamState, responder),
+						cancel:   cancel,
 						response: responder,
 					})
 				}
 			} else {
 				// No pre-existing watch exists, let's create one.
 				// We need to precompute the watches first then open a watch in the cache.
+				cancel, err := s.cache.CreateWatch(req, streamState, responder)
+				if err != nil {
+					return err
+				}
+
 				sw.watches.addWatch(typeURL, &watch{
-					cancel:   s.cache.CreateWatch(req, streamState, responder),
+					cancel:   cancel,
 					response: responder,
 				})
 			}
diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go
index a5d2a257f9..7fcb64b013 100644
--- a/pkg/server/sotw/v3/server.go
+++ b/pkg/server/sotw/v3/server.go
@@ -91,7 +91,7 @@ type streamWrapper struct {
 
 	// The below fields are used for tracking resource
 	// cache state and should be maintained per stream.
-	streamStates           map[string]stream.StreamState
+	streamStates           map[string]stream.SubscriptionState
 	lastDiscoveryResponses map[string]lastDiscoveryResponse
 }
 
diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go
index 61cc07905b..59d90b6257 100644
--- a/pkg/server/sotw/v3/xds.go
+++ b/pkg/server/sotw/v3/xds.go
@@ -27,7 +27,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
 
 		// a collection of stack allocated watches per request type.
 		watches:                newWatches(),
-		streamStates:           make(map[string]stream.StreamState),
+		streamStates:           make(map[string]stream.SubscriptionState),
 		lastDiscoveryResponses: make(map[string]lastDiscoveryResponse),
 	}
 
@@ -121,10 +121,12 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
 			if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok {
 				if lastResponse.nonce == "" || lastResponse.nonce == nonce {
 					// Let's record Resource names that a client has received.
-					streamState.SetResourceVersions(lastResponse.resources)
+					streamState.SetKnownResources(lastResponse.resources)
 				}
 			}
 
+			updateSubscriptionResources(req, &streamState)
+
 			typeURL := req.GetTypeUrl()
 			responder := make(chan cache.Response, 1)
 			if w, ok := sw.watches.responders[typeURL]; ok {
@@ -133,16 +135,24 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
 				if w.nonce == "" || w.nonce == nonce {
 					w.close()
 
+					cancel, err := s.cache.CreateWatch(req, streamState, responder)
+					if err != nil {
+						return err
+					}
 					sw.watches.addWatch(typeURL, &watch{
-						cancel:   s.cache.CreateWatch(req, streamState, responder),
+						cancel:   cancel,
 						response: responder,
 					})
 				}
 			} else {
 				// No pre-existing watch exists, let's create one.
 				// We need to precompute the watches first then open a watch in the cache.
+				cancel, err := s.cache.CreateWatch(req, streamState, responder)
+				if err != nil {
+					return err
+				}
 				sw.watches.addWatch(typeURL, &watch{
-					cancel:   s.cache.CreateWatch(req, streamState, responder),
+					cancel:   cancel,
 					response: responder,
 				})
 			}
@@ -168,3 +178,36 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
 		}
 	}
 }
+
+// updateSubscriptionResources provides a normalized view of resources to be used in Cache
+// It is also implementing the new behavior of wildcard as described in
+// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
+func updateSubscriptionResources(req *discovery.DiscoveryRequest, subscriptionState *stream.SubscriptionState) {
+	subscribedResources := make(map[string]struct{}, len(req.ResourceNames))
+	explicitWildcard := false
+	for _, resource := range req.ResourceNames {
+		if resource == "*" {
+			explicitWildcard = true
+		} else {
+			subscribedResources[resource] = struct{}{}
+		}
+	}
+
+	if subscriptionState.IsWildcard() && len(req.ResourceNames) == 0 && len(subscriptionState.GetSubscribedResources()) == 0 {
+		// We were wildcard and no resource has been subscribed
+		// Legacy wildcard mode states that we remain in wildcard mode
+		subscriptionState.SetWildcard(true)
+	} else if explicitWildcard {
+		// Explicit subscription to wildcard
+		// Documentation states that we should no longer allow to fallback to the previous case
+		// and no longer setting wildcard would no longer subscribe to anything
+		// For now we ignore this case and will not support unsubscribing in this case
+		subscriptionState.SetWildcard(true)
+	} else {
+		// The subscription is currently not wildcard, or there are resources or have been resources subscribed to
+		// This is no longer the legacy wildcard case as described by the specification
+		subscriptionState.SetWildcard(false)
+	}
+	subscriptionState.SetSubscribedResources(subscribedResources)
+
+}
diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go
index 28dea295b5..b999d40c78 100644
--- a/pkg/server/stream/v3/stream.go
+++ b/pkg/server/stream/v3/stream.go
@@ -21,69 +21,3 @@ type DeltaStream interface {
 	Send(*discovery.DeltaDiscoveryResponse) error
 	Recv() (*discovery.DeltaDiscoveryRequest, error)
 }
-
-// StreamState will keep track of resource cache state per type on a stream.
-type StreamState struct { // nolint:golint,revive
-	// Indicates whether the delta stream currently has a wildcard watch
-	wildcard bool
-
-	// Provides the list of resources explicitly requested by the client
-	// This list might be non-empty even when set as wildcard
-	subscribedResourceNames map[string]struct{}
-
-	// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
-	// This field stores the last state sent to the client.
-	resourceVersions map[string]string
-
-	// Ordered indicates whether we want an ordered ADS stream or not
-	ordered bool
-}
-
-// NewStreamState initializes a stream state.
-func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
-	state := StreamState{
-		wildcard:                wildcard,
-		subscribedResourceNames: map[string]struct{}{},
-		resourceVersions:        initialResourceVersions,
-		ordered:                 false, // Ordered comes from the first request since that's when we discover if they want ADS
-	}
-
-	if initialResourceVersions == nil {
-		state.resourceVersions = make(map[string]string)
-	}
-
-	return state
-}
-
-// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
-// If the request is set to wildcard it may be empty
-// Currently populated only when using delta-xds
-func (s StreamState) GetSubscribedResources() map[string]struct{} {
-	return s.subscribedResourceNames
-}
-
-// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to
-// It is decorrelated from the wildcard state of the stream
-// Currently used only when using delta-xds
-func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string]struct{}) {
-	s.subscribedResourceNames = subscribedResourceNames
-}
-
-func (s StreamState) GetKnownResources() map[string]string {
-	return s.resourceVersions
-}
-
-// SetResourceVersions sets a list of resource versions by type URL and removes the flag
-// of "first" since we can safely assume another request has come through the stream.
-func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) {
-	s.resourceVersions = resourceVersions
-}
-
-func (s *StreamState) SetWildcard(wildcard bool) {
-	s.wildcard = wildcard
-}
-
-// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request.
-func (s StreamState) IsWildcard() bool {
-	return s.wildcard
-}
diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go
new file mode 100644
index 0000000000..c5a17b46b8
--- /dev/null
+++ b/pkg/server/stream/v3/subscription.go
@@ -0,0 +1,81 @@
+package stream
+
+// SubscriptionState will keep track of a resource subscription on a stream.
+type SubscriptionState struct {
+	// wildcard is set if the subscription currently has a wildcard watch
+	wildcard bool
+
+	// subscribedResourceNames provides the resources explicitly requested by the client
+	// This list might be non-empty even when set as wildcard
+	subscribedResourceNames map[string]struct{}
+
+	// resourceVersions contains the resources acknowledged by the client and the versions
+	// associated to them
+	resourceVersions map[string]string
+}
+
+// NewSubscriptionState initializes a stream state.
+func NewSubscriptionState(wildcard bool, initialResourceVersions map[string]string) SubscriptionState {
+	state := SubscriptionState{
+		wildcard:                wildcard,
+		subscribedResourceNames: map[string]struct{}{},
+		resourceVersions:        initialResourceVersions,
+	}
+
+	if initialResourceVersions == nil {
+		state.resourceVersions = make(map[string]string)
+	}
+
+	return state
+}
+
+// GetSubscribedResources returns the list of resources currently explicitly subscribed to
+// If the request is set to wildcard it may be empty
+// Currently populated only when using delta-xds
+func (s SubscriptionState) GetSubscribedResources() map[string]struct{} {
+	return s.subscribedResourceNames
+}
+
+// SetSubscribedResources is setting the list of resources currently explicitly subscribed to
+// It is decorrelated from the wildcard state of the stream
+// Currently used only when using delta-xds
+func (s *SubscriptionState) SetSubscribedResources(subscribedResourceNames map[string]struct{}) {
+	s.subscribedResourceNames = subscribedResourceNames
+}
+
+// GetKnownResources returns the list of resources acknowledged by the client
+// and their acknowledged version
+func (s SubscriptionState) GetKnownResources() map[string]string {
+	return s.resourceVersions
+}
+
+// SetKnownResources sets a list of resource versions currently known by the client
+// The cache can use this state to compute resources added/updated/deleted
+func (s *SubscriptionState) SetKnownResources(resourceVersions map[string]string) {
+	s.resourceVersions = resourceVersions
+}
+
+// SetWildcard will set the subscription to return all known resources
+func (s *SubscriptionState) SetWildcard(wildcard bool) {
+	s.wildcard = wildcard
+}
+
+// IsWildcard returns whether or not the subscription currently has a wildcard watch
+func (s SubscriptionState) IsWildcard() bool {
+	return s.wildcard
+}
+
+// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
+// If the request is wildcard, it will always return true,
+// otherwise it will compare the provided resources to the list of resources currently subscribed
+func (s SubscriptionState) WatchesResources(resourceNames map[string]struct{}) bool {
+	if s.wildcard {
+		return true
+	}
+	for resourceName := range resourceNames {
+		if _, ok := s.subscribedResourceNames[resourceName]; ok {
+			return true
+		}
+	}
+	return false
+}
diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go
index 99a8039f28..476a1fe378 100644
--- a/pkg/server/v3/delta_test.go
+++ b/pkg/server/v3/delta_test.go
@@ -19,7 +19,7 @@ import (
 	"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
 )
 
-func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.ClientState, out chan cache.DeltaResponse) func() {
+func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.SubscriptionState, out chan cache.DeltaResponse) (func(), error) {
 	config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1
 
 	// This is duplicated from pkg/cache/v3/delta.go as private there
@@ -87,10 +87,10 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
 		config.deltaWatches++
 		return func() {
 			config.deltaWatches--
-		}
+		}, nil
 	}
 
-	return nil
+	return nil, nil
 }
 
 type mockDeltaStream struct {
diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go
index b8cac11cfd..905a0b5351 100644
--- a/pkg/server/v3/server_test.go
+++ b/pkg/server/v3/server_test.go
@@ -48,7 +48,7 @@ type mockConfigWatcher struct {
 	mu *sync.RWMutex
 }
 
-func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() {
+func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.SubscriptionState, out chan cache.Response) (func(), error) {
 	config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1
 	if len(config.responses[req.TypeUrl]) > 0 {
 		out <- config.responses[req.TypeUrl][0]
@@ -57,9 +57,9 @@ func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, st
 		config.watches++
 		return func() {
 			config.watches--
-		}
+		}, nil
 	}
-	return nil
+	return nil, nil
 }
 
 func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) {