From a382ab94fa9da915454757eb7fd3789db6ab2474 Mon Sep 17 00:00:00 2001 From: rambohe Date: Sat, 25 Jan 2025 00:14:10 +1100 Subject: [PATCH] feat: improve yurthub configmap management (#2275) Signed-off-by: rambohe-ch --- cmd/yurthub/app/config/config.go | 7 +- cmd/yurthub/app/options/filters.go | 59 +- cmd/yurthub/app/start.go | 2 +- pkg/yurthub/cachemanager/cache_agent.go | 150 ----- pkg/yurthub/cachemanager/cache_agent_test.go | 88 --- pkg/yurthub/cachemanager/cache_manager.go | 15 +- .../cachemanager/cache_manager_test.go | 33 +- pkg/yurthub/configuration/manager.go | 288 ++++++++++ pkg/yurthub/configuration/manager_test.go | 337 +++++++++++ pkg/yurthub/filter/approver/approver.go | 263 +-------- pkg/yurthub/filter/approver/approver_test.go | 538 +----------------- pkg/yurthub/filter/base/base.go | 6 +- .../filter/discardcloudservice/filter.go | 7 - .../filter/discardcloudservice/filter_test.go | 19 - .../filter/forwardkubesvctraffic/filter.go | 7 - .../forwardkubesvctraffic/filter_test.go | 19 - pkg/yurthub/filter/inclusterconfig/filter.go | 7 - .../filter/inclusterconfig/filter_test.go | 19 - pkg/yurthub/filter/interfaces.go | 4 - pkg/yurthub/filter/manager/manager.go | 23 +- pkg/yurthub/filter/manager/manager_test.go | 7 +- pkg/yurthub/filter/masterservice/filter.go | 7 - .../filter/masterservice/filter_test.go | 19 - .../filter/nodeportisolation/filter.go | 6 - .../filter/nodeportisolation/filter_test.go | 19 - pkg/yurthub/filter/responsefilter/filter.go | 29 +- .../filter/responsefilter/filter_test.go | 12 +- .../filter/serviceenvupdater/filter.go | 7 - .../filter/serviceenvupdater/filter_test.go | 17 - pkg/yurthub/filter/servicetopology/filter.go | 8 - .../filter/servicetopology/filter_test.go | 19 - pkg/yurthub/proxy/autonomy/autonomy_test.go | 9 +- pkg/yurthub/proxy/local/local_test.go | 34 +- pkg/yurthub/yurtcoordinator/coordinator.go | 6 +- 34 files changed, 823 insertions(+), 1267 deletions(-) delete mode 100644 pkg/yurthub/cachemanager/cache_agent.go delete mode 100644 pkg/yurthub/cachemanager/cache_agent_test.go create mode 100644 pkg/yurthub/configuration/manager.go create mode 100644 pkg/yurthub/configuration/manager_test.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 54373cbb096..31f2ea7ce3c 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -51,6 +51,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/certificate" certificatemgr "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" @@ -121,6 +122,7 @@ type YurtHubConfiguration struct { PostStartHooks map[string]func() error RequestMultiplexerManager multiplexer.MultiplexerManager MultiplexerResources []schema.GroupVersionResource + ConfigManager *configuration.Manager } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -158,7 +160,9 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { } tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations) registerInformers(options, sharedFactory, workingMode, tenantNs) - filterFinder, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager) + + configManager := configuration.NewConfigurationManager(options.NodeName, sharedFactory) + filterFinder, err := manager.NewFilterManager(options, sharedFactory, dynamicSharedFactory, proxiedClient, serializerManager, configManager) if err != nil { klog.Errorf("could not create filter manager, %v", err) return nil, err @@ -198,6 +202,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { HostControlPlaneAddr: options.HostControlPlaneAddr, MultiplexerResources: AllowedMultiplexerResources, RequestMultiplexerManager: newMultiplexerCacheManager(options), + ConfigManager: configManager, } // if yurthub is in local mode, certMgr and networkMgr are no need to start diff --git a/cmd/yurthub/app/options/filters.go b/cmd/yurthub/app/options/filters.go index 1e344daaa11..75de1241a03 100644 --- a/cmd/yurthub/app/options/filters.go +++ b/cmd/yurthub/app/options/filters.go @@ -31,15 +31,56 @@ var ( // DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode. DisabledInCloudMode = []string{discardcloudservice.FilterName, forwardkubesvctraffic.FilterName, serviceenvupdater.FilterName} - // SupportedComponentsForFilter is used for specifying which components are supported by filters as default setting. - SupportedComponentsForFilter = map[string]string{ - masterservice.FilterName: "kubelet", - discardcloudservice.FilterName: "kube-proxy", - servicetopology.FilterName: "kube-proxy, coredns, nginx-ingress-controller", - inclusterconfig.FilterName: "kubelet", - nodeportisolation.FilterName: "kube-proxy", - forwardkubesvctraffic.FilterName: "kube-proxy", - serviceenvupdater.FilterName: "kubelet", + // FilterToComponentsResourcesAndVerbs is used to specify which request with resource and verb from component is supported by the filter. + // When adding a new filter, It is essential to update the FilterToComponentsResourcesAndVerbs map + // to include this new filter along with the component, resource and request verbs it supports. + FilterToComponentsResourcesAndVerbs = map[string]struct { + DefaultComponents []string + ResourceAndVerbs map[string][]string + }{ + masterservice.FilterName: { + DefaultComponents: []string{"kubelet"}, + ResourceAndVerbs: map[string][]string{ + "services": {"list", "watch"}, + }, + }, + discardcloudservice.FilterName: { + DefaultComponents: []string{"kube-proxy"}, + ResourceAndVerbs: map[string][]string{ + "services": {"list", "watch"}, + }, + }, + servicetopology.FilterName: { + DefaultComponents: []string{"kube-proxy", "coredns", "nginx-ingress-controller"}, + ResourceAndVerbs: map[string][]string{ + "endpoints": {"list", "watch"}, + "endpointslices": {"list", "watch"}, + }, + }, + inclusterconfig.FilterName: { + DefaultComponents: []string{"kubelet"}, + ResourceAndVerbs: map[string][]string{ + "configmaps": {"get", "list", "watch"}, + }, + }, + nodeportisolation.FilterName: { + DefaultComponents: []string{"kube-proxy"}, + ResourceAndVerbs: map[string][]string{ + "services": {"list", "watch"}, + }, + }, + forwardkubesvctraffic.FilterName: { + DefaultComponents: []string{"kube-proxy"}, + ResourceAndVerbs: map[string][]string{ + "endpointslices": {"list", "watch"}, + }, + }, + serviceenvupdater.FilterName: { + DefaultComponents: []string{"kubelet"}, + ResourceAndVerbs: map[string][]string{ + "pods": {"list", "watch", "get", "patch"}, + }, + }, } ) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 21969d3855c..52709dfc827 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { var cacheMgr cachemanager.CacheManager if cfg.WorkingMode == util.WorkingModeEdge { klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) - cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) + cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.ConfigManager) } else { klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName) } diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go deleted file mode 100644 index ba17b8cfd16..00000000000 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ /dev/null @@ -1,150 +0,0 @@ -/* -Copyright 2020 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cachemanager - -import ( - "strings" - "sync" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -const ( - sepForAgent = "," -) - -type CacheAgent struct { - sync.Mutex - agents sets.Set[string] - store StorageWrapper - nodeName string -} - -func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent { - ca := &CacheAgent{ - agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName), - store: store, - nodeName: nodeName, - } - configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer() - configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ca.addConfigmap, - UpdateFunc: ca.updateConfigmap, - DeleteFunc: ca.deleteConfigmap, - }) - - klog.Infof("init cache agents to %v", ca.agents) - return ca -} - -func (ca *CacheAgent) HasAny(items ...string) bool { - newAgents := make([]string, 0, len(items)) - for i := range items { - if n := strings.Index(items[i], "/partialobjectmetadata"); n != -1 { - newAgents = append(newAgents, items[i][:n]) - } else { - newAgents = append(newAgents, items[i]) - } - } - return ca.agents.HasAny(newAgents...) -} - -func (ca *CacheAgent) addConfigmap(obj interface{}) { - cfg, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - - deletedAgents := ca.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add") - ca.deleteAgentCache(deletedAgents) -} - -func (ca *CacheAgent) updateConfigmap(oldObj, newObj interface{}) { - oldCfg, ok := oldObj.(*corev1.ConfigMap) - if !ok { - return - } - - newCfg, ok := newObj.(*corev1.ConfigMap) - if !ok { - return - } - - if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] { - return - } - - deletedAgents := ca.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update") - ca.deleteAgentCache(deletedAgents) -} - -func (ca *CacheAgent) deleteConfigmap(obj interface{}) { - _, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - - deletedAgents := ca.updateCacheAgents("", "delete") - ca.deleteAgentCache(deletedAgents) -} - -// updateCacheAgents update cache agents -func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] { - newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName) - for _, agent := range strings.Split(cacheAgents, sepForAgent) { - agent = strings.TrimSpace(agent) - if len(agent) != 0 { - newAgents.Insert(agent) - } - } - - ca.Lock() - defer ca.Unlock() - - if ca.agents.Equal(newAgents) { - return sets.Set[string]{} - } - - // get deleted and added agents - deletedAgents := ca.agents.Difference(newAgents) - ca.agents = newAgents - - klog.Infof("current cache agents: %v after %s, deleted agents: %v", ca.agents, action, deletedAgents) - - // return deleted agents - return deletedAgents -} - -func (ca *CacheAgent) deleteAgentCache(deletedAgents sets.Set[string]) { - // delete cache data for deleted agents - if deletedAgents.Len() > 0 { - components := deletedAgents.UnsortedList() - for i := range components { - if err := ca.store.DeleteComponentResources(components[i]); err != nil { - klog.Errorf("could not cleanup cache for deleted agent(%s), %v", components[i], err) - } else { - klog.Infof("cleanup cache for agent(%s) successfully", components[i]) - } - } - } -} diff --git a/pkg/yurthub/cachemanager/cache_agent_test.go b/pkg/yurthub/cachemanager/cache_agent_test.go deleted file mode 100644 index bf38d001e28..00000000000 --- a/pkg/yurthub/cachemanager/cache_agent_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2020 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cachemanager - -import ( - "strings" - "testing" - - "k8s.io/apimachinery/pkg/util/sets" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -func TestUpdateCacheAgents(t *testing.T) { - testcases := map[string]struct { - desc string - initAgents []string - cacheAgents string - resultAgents sets.Set[string] - deletedAgents sets.Set[string] - }{ - "two new agents updated": { - initAgents: []string{}, - cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), - deletedAgents: sets.Set[string]{}, - }, - "two new agents updated but an old agent deleted": { - initAgents: []string{"agent1", "agent2"}, - cacheAgents: "agent2,agent3", - resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), - deletedAgents: sets.New("agent1"), - }, - "no agents updated ": { - initAgents: []string{"agent1", "agent2"}, - cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), - deletedAgents: sets.New[string](), - }, - "no agents updated with default": { - initAgents: []string{"agent1", "agent2", "kubelet"}, - cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), - deletedAgents: sets.New[string](), - }, - "empty agents added ": { - initAgents: []string{}, - cacheAgents: "", - resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), - deletedAgents: sets.New[string](), - }, - } - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - m := &CacheAgent{ - agents: sets.New(tt.initAgents...), - nodeName: "iz2ze21g5pq9jbesubrksvz", - } - - m.updateCacheAgents(strings.Join(tt.initAgents, ","), "") - - // add agents - deletedAgents := m.updateCacheAgents(tt.cacheAgents, "") - - if !deletedAgents.Equal(tt.deletedAgents) { - t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents) - } - - if !m.agents.Equal(tt.resultAgents) { - t.Errorf("Got cache agents: %v, expect agents: %v", m.agents, tt.resultAgents) - } - }) - } -} diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 9c976059de1..9618718d7bc 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -38,10 +38,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage" @@ -77,25 +77,23 @@ type cacheManager struct { storage StorageWrapper serializerManager *serializer.SerializerManager restMapperManager *hubmeta.RESTMapperManager - cacheAgents *CacheAgent + configManager *configuration.Manager listSelectorCollector map[storage.Key]string inMemoryCache map[string]runtime.Object } // NewCacheManager creates a new CacheManager func NewCacheManager( - nodeName string, storagewrapper StorageWrapper, serializerMgr *serializer.SerializerManager, restMapperMgr *hubmeta.RESTMapperManager, - sharedFactory informers.SharedInformerFactory, + configManager *configuration.Manager, ) CacheManager { - cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper) cm := &cacheManager{ storage: storagewrapper, serializerManager: serializerMgr, - cacheAgents: cacheAgents, restMapperManager: restMapperMgr, + configManager: configManager, listSelectorCollector: make(map[storage.Key]string), inMemoryCache: make(map[string]runtime.Object), } @@ -747,12 +745,9 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool { if ok && canCache { // request with Edge-Cache header, continue verification } else { - cm.RLock() - if !cm.cacheAgents.HasAny("*", comp) { - cm.RUnlock() + if !cm.configManager.IsCacheable(comp) { return false } - cm.RUnlock() } info, ok := apirequest.RequestInfoFrom(ctx) diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 3f06abcd079..baf4bf6fb05 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" @@ -54,9 +55,7 @@ import ( ) var ( - rootDir = "/tmp/cache-manager" - fakeClient = fake.NewSimpleClientset() - fakeSharedInformerFactory = informers.NewSharedInformerFactory(fakeClient, 0) + rootDir = "/tmp/cache-manager" ) func TestCacheGetResponse(t *testing.T) { @@ -70,7 +69,10 @@ func TestCacheGetResponse(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) testcases := map[string]struct { inputObj runtime.Object @@ -607,7 +609,10 @@ func TestCacheWatchResponse(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) testcases := map[string]struct { inputObj []watch.Event @@ -1014,7 +1019,10 @@ func TestCacheListResponse(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) testcases := map[string]struct { inputObj runtime.Object @@ -1607,7 +1615,10 @@ func TestQueryCacheForGet(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) testcases := map[string]struct { keyBuildInfo storage.KeyBuildInfo @@ -2334,7 +2345,10 @@ func TestQueryCacheForList(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) testcases := map[string]struct { keyBuildInfo *storage.KeyBuildInfo @@ -3158,7 +3172,8 @@ func TestCanCacheFor(t *testing.T) { defer close(stop) client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - m := NewCacheManager("node1", s, nil, nil, informerFactory) + configManager := configuration.NewConfigurationManager("node1", informerFactory) + m := NewCacheManager(s, nil, nil, configManager) informerFactory.Start(nil) cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced) if tt.preRequest != nil { diff --git a/pkg/yurthub/configuration/manager.go b/pkg/yurthub/configuration/manager.go new file mode 100644 index 00000000000..9a7fcfd8882 --- /dev/null +++ b/pkg/yurthub/configuration/manager.go @@ -0,0 +1,288 @@ +/* +Copyright 2025 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "fmt" + "net/http" + "reflect" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/cmd/yurthub/app/options" + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +const ( + cacheUserAgentsKey = "cache_agents" + sepForAgent = "," +) + +var ( + defaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName()} +) + +// Manager is used for managing all configurations of Yurthub in yurt-hub-cfg configmap. +// This configuration configmap includes configurations of cache agents and filters. I'm sure that new +// configurations will be added according to user's new requirements. +type Manager struct { + sync.RWMutex + baseAgents []string + allCacheAgents sets.Set[string] + baseKeyToFilters map[string][]string + reqKeyToFilters map[string][]string + configMapSynced cache.InformerSynced +} + +func NewConfigurationManager(nodeName string, sharedFactory informers.SharedInformerFactory) *Manager { + configmapInformer := sharedFactory.Core().V1().ConfigMaps().Informer() + m := &Manager{ + baseAgents: append(defaultCacheAgents, util.MultiplexerProxyClientUserAgentPrefix+nodeName), + allCacheAgents: sets.New[string](), + baseKeyToFilters: make(map[string][]string), + reqKeyToFilters: make(map[string][]string), + configMapSynced: configmapInformer.HasSynced, + } + + // init cache agents + m.updateCacheAgents("", "init") + for filterName, req := range options.FilterToComponentsResourcesAndVerbs { + for _, comp := range req.DefaultComponents { + for resource, verbs := range req.ResourceAndVerbs { + for _, verb := range verbs { + if key := reqKey(comp, verb, resource); len(key) != 0 { + if _, ok := m.baseKeyToFilters[key]; !ok { + m.baseKeyToFilters[key] = []string{filterName} + } else { + m.baseKeyToFilters[key] = append(m.baseKeyToFilters[key], filterName) + } + } + } + } + } + } + // init filter settings + m.updateFilterSettings(map[string]string{}, "init") + + // prepare configmap event handler + configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.addConfigmap, + UpdateFunc: m.updateConfigmap, + DeleteFunc: m.deleteConfigmap, + }) + return m +} + +// HasSynced is used for checking that configuration of Yurthub has been loaded completed or not. +func (m *Manager) HasSynced() bool { + return m.configMapSynced() +} + +// ListAllCacheAgents is used for listing all cache agents. +func (m *Manager) ListAllCacheAgents() []string { + m.RLock() + defer m.RUnlock() + + return m.allCacheAgents.UnsortedList() +} + +// IsCacheable is used for checking that http response of specified component +// should be cached on the local disk or not. +func (m *Manager) IsCacheable(comp string) bool { + if strings.Contains(comp, "/") { + index := strings.Index(comp, "/") + if index != -1 { + comp = comp[:index] + } + } + + m.RLock() + defer m.RUnlock() + return m.allCacheAgents.HasAny("*", comp) +} + +// FindFiltersFor is used for finding all filters for the specified request. +// the return value represents all the filter names for the request. +func (m *Manager) FindFiltersFor(req *http.Request) []string { + key := getKeyByRequest(req) + if len(key) == 0 { + return []string{} + } + + m.RLock() + defer m.RUnlock() + if filters, ok := m.reqKeyToFilters[key]; ok { + return filters + } + return []string{} +} + +func (m *Manager) addConfigmap(obj interface{}) { + cfg, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + + m.updateCacheAgents(cfg.Data[cacheUserAgentsKey], "add") + m.updateFilterSettings(cfg.Data, "add") +} + +func (m *Manager) updateConfigmap(oldObj, newObj interface{}) { + oldCfg, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + + newCfg, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + if oldCfg.Data[cacheUserAgentsKey] != newCfg.Data[cacheUserAgentsKey] { + m.updateCacheAgents(newCfg.Data[cacheUserAgentsKey], "update") + } + + if filterSettingsChanged(oldCfg.Data, newCfg.Data) { + m.updateFilterSettings(newCfg.Data, "update") + } +} + +func (m *Manager) deleteConfigmap(obj interface{}) { + _, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + m.updateCacheAgents("", "delete") + m.updateFilterSettings(map[string]string{}, "delete") +} + +// updateCacheAgents update cache agents +// todo: cache on the local disk should be removed when agent is deleted. +func (m *Manager) updateCacheAgents(cacheAgents, action string) { + newAgents := make([]string, 0) + newAgents = append(newAgents, m.baseAgents...) + for _, agent := range strings.Split(cacheAgents, sepForAgent) { + agent = strings.TrimSpace(agent) + if len(agent) != 0 { + newAgents = append(newAgents, agent) + } + } + + klog.Infof("After action %s, the cache agents are as follows: %v", action, newAgents) + m.Lock() + defer m.Unlock() + m.allCacheAgents.Clear() + m.allCacheAgents.Insert(newAgents...) +} + +// filterSettingsChanged is used to verify filter setting is changed or not. +func filterSettingsChanged(old, new map[string]string) bool { + oldCopy := make(map[string]string) + newCopy := make(map[string]string) + for key, val := range old { + if _, ok := options.FilterToComponentsResourcesAndVerbs[key]; ok { + oldCopy[key] = val + } + } + + for key, val := range new { + if _, ok := options.FilterToComponentsResourcesAndVerbs[key]; ok { + newCopy[key] = val + } + } + + // if filter setting of old and new equal, return false. + // vice versa, return true. + return !reflect.DeepEqual(oldCopy, newCopy) +} + +func (m *Manager) updateFilterSettings(cmData map[string]string, action string) { + // prepare the default filter settings + reqKeyToFilterSet := make(map[string]sets.Set[string]) + for key, filterNames := range m.baseKeyToFilters { + reqKeyToFilterSet[key] = sets.New[string](filterNames...) + } + + // add filter settings from configmap + for filterName, components := range cmData { + if req, ok := options.FilterToComponentsResourcesAndVerbs[filterName]; ok { + for _, comp := range strings.Split(components, sepForAgent) { + for resource, verbs := range req.ResourceAndVerbs { + for _, verb := range verbs { + if key := reqKey(comp, verb, resource); len(key) != 0 { + if _, ok := reqKeyToFilterSet[key]; !ok { + reqKeyToFilterSet[key] = sets.New[string](filterName) + } else { + reqKeyToFilterSet[key].Insert(filterName) + } + } + } + } + } + } + } + + reqKeyToFilters := make(map[string][]string) + for key, filterSet := range reqKeyToFilterSet { + reqKeyToFilters[key] = filterSet.UnsortedList() + } + + klog.Infof("After action %s, the filter settings are as follows: %v", action, reqKeyToFilters) + m.Lock() + defer m.Unlock() + m.reqKeyToFilters = reqKeyToFilters +} + +// getKeyByRequest returns reqKey for specified request. +func getKeyByRequest(req *http.Request) string { + var key string + ctx := req.Context() + comp, ok := util.ClientComponentFrom(ctx) + if !ok { + return key + } + + if strings.Contains(comp, "/") { + index := strings.Index(comp, "/") + if index != -1 { + comp = comp[:index] + } + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return key + } + + return reqKey(comp, info.Verb, info.Resource) +} + +// reqKey is made up by comp and verb, resource +func reqKey(comp, verb, resource string) string { + if len(comp) == 0 || len(resource) == 0 || len(verb) == 0 { + return "" + } + return fmt.Sprintf("%s/%s/%s", strings.TrimSpace(comp), strings.TrimSpace(verb), strings.TrimSpace(resource)) +} diff --git a/pkg/yurthub/configuration/manager_test.go b/pkg/yurthub/configuration/manager_test.go new file mode 100644 index 00000000000..85f138be62e --- /dev/null +++ b/pkg/yurthub/configuration/manager_test.go @@ -0,0 +1,337 @@ +/* +Copyright 2025 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configuration + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/forwardkubesvctraffic" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/nodeportisolation" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/serviceenvupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +func TestManager(t *testing.T) { + testcases := map[string]struct { + nodeName string + addCM *v1.ConfigMap + updateCM *v1.ConfigMap + deleteCM *v1.ConfigMap + initAgents sets.Set[string] + addedAgents sets.Set[string] + updatedAgents sets.Set[string] + deletedAgents sets.Set[string] + cacheableAgents []string + comp string + initFilterSet map[string]sets.Set[string] + addedFilterSet map[string]sets.Set[string] + updatedFilterSet map[string]sets.Set[string] + deletedFilterSet map[string]sets.Set[string] + }{ + "check the init status of configuration manager": { + nodeName: "foo", + initAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo"), + cacheableAgents: []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix + "foo"}, + initFilterSet: map[string]sets.Set[string]{ + "kubelet/list/pods": sets.New[string](serviceenvupdater.FilterName), + "kubelet/watch/pods": sets.New[string](serviceenvupdater.FilterName), + "kubelet/get/pods": sets.New[string](serviceenvupdater.FilterName), + "kubelet/patch/pods": sets.New[string](serviceenvupdater.FilterName), + "kube-proxy/list/endpoints": sets.New[string](servicetopology.FilterName), + "kube-proxy/list/endpointslices": sets.New[string](servicetopology.FilterName, forwardkubesvctraffic.FilterName), + "foo/list/pods": sets.New[string](), + }, + }, + "check the configuration manager status after adding settings": { + nodeName: "foo", + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + cacheUserAgentsKey: "foo, bar", + servicetopology.FilterName: "foo", + }, + }, + initAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo"), + addedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo", "foo", "bar"), + cacheableAgents: []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix + "foo", "foo", "bar"}, + addedFilterSet: map[string]sets.Set[string]{ + "foo/list/endpoints": sets.New[string](servicetopology.FilterName), + "foo/watch/endpointslices": sets.New[string](servicetopology.FilterName), + }, + }, + "check the configuration manager status after adding and updating settings": { + nodeName: "foo", + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + cacheUserAgentsKey: "foo, bar", + servicetopology.FilterName: "bar", + }, + }, + updateCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + cacheUserAgentsKey: "foo, zag", + servicetopology.FilterName: "zag", + nodeportisolation.FilterName: "zag", + discardcloudservice.FilterName: "zag", + }, + }, + initAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo"), + addedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo", "foo", "bar"), + updatedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo", "foo", "zag"), + cacheableAgents: []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix + "foo", "foo/xxx", "zag/xxx"}, + addedFilterSet: map[string]sets.Set[string]{ + "bar/list/endpoints": sets.New[string](servicetopology.FilterName), + "bar/watch/endpointslices": sets.New[string](servicetopology.FilterName), + }, + updatedFilterSet: map[string]sets.Set[string]{ + "bar/list/endpoints": sets.New[string](), + "bar/watch/endpointslices": sets.New[string](), + "/watch/endpointslices": sets.New[string](), + "zag/list/endpoints": sets.New[string](servicetopology.FilterName), + "zag/watch/endpointslices": sets.New[string](servicetopology.FilterName), + "zag/list/services": sets.New[string](nodeportisolation.FilterName, discardcloudservice.FilterName), + }, + }, + "check the configuration manager status after adding and updating and deleting settings": { + nodeName: "foo", + addCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + cacheUserAgentsKey: "foo, bar", + }, + }, + updateCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + cacheUserAgentsKey: "foo, zag", + servicetopology.FilterName: "zag", + }, + }, + deleteCM: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-hub-cfg", + Namespace: "kube-system", + }, + }, + initAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo"), + addedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo", "foo", "bar"), + updatedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo", "foo", "zag"), + deletedAgents: sets.New[string]("kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+"foo"), + cacheableAgents: []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix + "foo"}, + comp: "zag/xxx", + updatedFilterSet: map[string]sets.Set[string]{ + "/list/endpoints": sets.New[string](servicetopology.FilterName), + "/watch/endpointslices": sets.New[string](servicetopology.FilterName), + }, + deletedFilterSet: map[string]sets.Set[string]{ + "kube-proxy/list/endpoints": sets.New[string](servicetopology.FilterName), + "kube-proxy/list/endpointslices": sets.New[string](servicetopology.FilterName, forwardkubesvctraffic.FilterName), + "/list/endpoints": sets.New[string](), + "/watch/endpointslices": sets.New[string](), + }, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + client := fake.NewSimpleClientset() + informerfactory := informers.NewSharedInformerFactory(client, 0) + manager := NewConfigurationManager(tc.nodeName, informerfactory) + + stopCh := make(chan struct{}) + informerfactory.Start(stopCh) + defer close(stopCh) + + if ok := cache.WaitForCacheSync(stopCh, manager.HasSynced); !ok { + t.Errorf("configuration manager is not ready") + return + } + + initAgents := sets.New[string](manager.ListAllCacheAgents()...) + if !initAgents.Equal(tc.initAgents) { + t.Errorf("expect init agents %v, but got %v", tc.initAgents.UnsortedList(), initAgents.UnsortedList()) + return + } + + if len(tc.initFilterSet) != 0 { + for key, filterSet := range tc.initFilterSet { + parts := strings.Split(key, "/") + req := new(http.Request) + comp := parts[0] + if len(parts[0]) == 0 && tc.comp != "" { + comp = tc.comp + } + ctx := util.WithClientComponent(context.Background(), comp) + ctx = apirequest.WithRequestInfo(ctx, &apirequest.RequestInfo{Verb: parts[1], Resource: parts[2]}) + req = req.WithContext(ctx) + + filters := manager.FindFiltersFor(req) + if !filterSet.Equal(sets.New[string](filters...)) { + t.Errorf("expect filters %v, but got %v", filterSet.UnsortedList(), filters) + } + } + } + + // add configmap + if tc.addCM != nil { + _, err := client.CoreV1().ConfigMaps("kube-system").Create(context.Background(), tc.addCM, metav1.CreateOptions{}) + if err != nil { + t.Errorf("couldn't create configmap, %v", err) + return + } + + time.Sleep(time.Second * 1) + addedAgents := sets.New[string](manager.ListAllCacheAgents()...) + if !addedAgents.Equal(tc.addedAgents) { + t.Errorf("expect added agents %v, but got %v", tc.addedAgents.UnsortedList(), addedAgents.UnsortedList()) + return + } + + if len(tc.addedFilterSet) != 0 { + for key, filterSet := range tc.addedFilterSet { + parts := strings.Split(key, "/") + req := new(http.Request) + comp := parts[0] + if len(parts[0]) == 0 && tc.comp != "" { + comp = tc.comp + } + ctx := util.WithClientComponent(context.Background(), comp) + ctx = apirequest.WithRequestInfo(ctx, &apirequest.RequestInfo{Verb: parts[1], Resource: parts[2]}) + req = req.WithContext(ctx) + + filters := manager.FindFiltersFor(req) + if !filterSet.Equal(sets.New[string](filters...)) { + t.Errorf("expect filters %v, but got %v", filterSet.UnsortedList(), filters) + } + } + } + } + + // update configmap + if tc.updateCM != nil { + _, err := client.CoreV1().ConfigMaps("kube-system").Update(context.Background(), tc.updateCM, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("couldn't update configmap, %v", err) + return + } + + time.Sleep(time.Second * 1) + updatedAgents := sets.New[string](manager.ListAllCacheAgents()...) + if !updatedAgents.Equal(tc.updatedAgents) { + t.Errorf("expect updated agents %v, but got %v", tc.updatedAgents.UnsortedList(), updatedAgents.UnsortedList()) + return + } + + if len(tc.updatedFilterSet) != 0 { + for key, filterSet := range tc.updatedFilterSet { + parts := strings.Split(key, "/") + req := new(http.Request) + comp := parts[0] + if len(parts[0]) == 0 && tc.comp != "" { + comp = tc.comp + } + ctx := util.WithClientComponent(context.Background(), comp) + ctx = apirequest.WithRequestInfo(ctx, &apirequest.RequestInfo{Verb: parts[1], Resource: parts[2]}) + req = req.WithContext(ctx) + + filters := manager.FindFiltersFor(req) + if !filterSet.Equal(sets.New[string](filters...)) { + t.Errorf("expect filters %v, but got %v", filterSet.UnsortedList(), filters) + } + } + } + } + + // delete configmap + if tc.deleteCM != nil { + err := client.CoreV1().ConfigMaps("kube-system").Delete(context.Background(), tc.deleteCM.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("couldn't delete configmap, %v", err) + return + } + + time.Sleep(time.Second * 1) + deletedAgents := sets.New[string](manager.ListAllCacheAgents()...) + if !deletedAgents.Equal(tc.deletedAgents) { + t.Errorf("expect deleted agents %v, but got %v", tc.deletedAgents.UnsortedList(), deletedAgents.UnsortedList()) + return + } + + if len(tc.deletedFilterSet) != 0 { + for key, filterSet := range tc.deletedFilterSet { + parts := strings.Split(key, "/") + req := new(http.Request) + comp := parts[0] + if len(parts[0]) == 0 && tc.comp != "" { + comp = tc.comp + } + ctx := util.WithClientComponent(context.Background(), comp) + ctx = apirequest.WithRequestInfo(ctx, &apirequest.RequestInfo{Verb: parts[1], Resource: parts[2]}) + req = req.WithContext(ctx) + + filters := manager.FindFiltersFor(req) + if !filterSet.Equal(sets.New[string](filters...)) { + t.Errorf("expect filters %v, but got %v", filterSet.UnsortedList(), filters) + } + } + } + } + + if len(tc.cacheableAgents) != 0 { + for _, agent := range tc.cacheableAgents { + if !manager.IsCacheable(agent) { + t.Errorf("agent(%s) is not cacheable", agent) + return + } + } + } + }) + } + +} diff --git a/pkg/yurthub/filter/approver/approver.go b/pkg/yurthub/filter/approver/approver.go index 32671baff95..3a1b5c44517 100644 --- a/pkg/yurthub/filter/approver/approver.go +++ b/pkg/yurthub/filter/approver/approver.go @@ -17,275 +17,56 @@ limitations under the License. package approver import ( - "fmt" "net/http" - "reflect" "strings" - "sync" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" - apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) type approver struct { - sync.Mutex - reqKeyToNames map[string]sets.Set[string] - configMapSynced cache.InformerSynced - supportedResourceAndVerbsForFilter map[string]map[string]sets.Set[string] - defaultReqKeyToNames map[string]sets.Set[string] - stopCh chan struct{} + skipRequestUserAgentList sets.Set[string] + configManager *configuration.Manager + stopCh chan struct{} } -var ( - // defaultBlackListRequests is used for requests that don't need to be filtered. - defaultBlackListRequests = sets.NewString(reqKey(projectinfo.GetHubName(), "configmaps", "list"), reqKey(projectinfo.GetHubName(), "configmaps", "watch")) -) - -func NewApprover(sharedFactory informers.SharedInformerFactory, filterSupportedResAndVerbs map[string]map[string]sets.Set[string]) filter.Approver { - configMapInformer := sharedFactory.Core().V1().ConfigMaps().Informer() +func NewApprover(nodeName string, configManager *configuration.Manager) filter.Approver { na := &approver{ - reqKeyToNames: make(map[string]sets.Set[string]), - configMapSynced: configMapInformer.HasSynced, - supportedResourceAndVerbsForFilter: filterSupportedResAndVerbs, - defaultReqKeyToNames: make(map[string]sets.Set[string]), - stopCh: make(chan struct{}), - } - - for name, setting := range options.SupportedComponentsForFilter { - for _, key := range na.parseRequestSetting(name, setting) { - if _, ok := na.defaultReqKeyToNames[key]; !ok { - na.defaultReqKeyToNames[key] = sets.New[string]() - } - na.defaultReqKeyToNames[key].Insert(name) - } + skipRequestUserAgentList: sets.New[string](projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+nodeName), + configManager: configManager, + stopCh: make(chan struct{}), } - - na.merge("init", na.defaultReqKeyToNames) - configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: na.addConfigMap, - UpdateFunc: na.updateConfigMap, - DeleteFunc: na.deleteConfigMap, - }) return na } func (a *approver) Approve(req *http.Request) (bool, []string) { - filterNames := make([]string, 0) - key := getKeyByRequest(req) - if len(key) == 0 { - return false, filterNames - } - - if defaultBlackListRequests.Has(key) { - return false, filterNames - } - - if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok { - return false, filterNames - } - - a.Lock() - defer a.Unlock() - if nameSetting, ok := a.reqKeyToNames[key]; ok { - return true, nameSetting.UnsortedList() - } - - return false, filterNames -} - -func (a *approver) addConfigMap(obj interface{}) { - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - - // get reqKeyToNames of user request setting from configmap - reqKeyToNamesFromCM := make(map[string]sets.Set[string]) - for key, setting := range cm.Data { - if filterName, ok := a.hasFilterName(key); ok { - for _, requestKey := range a.parseRequestSetting(filterName, setting) { - if _, ok := reqKeyToNamesFromCM[requestKey]; !ok { - reqKeyToNamesFromCM[requestKey] = sets.New[string]() - } - reqKeyToNamesFromCM[requestKey].Insert(filterName) - } - } - } - - // update reqKeyToNames by merging user setting - a.merge("add", reqKeyToNamesFromCM) -} - -func (a *approver) updateConfigMap(oldObj, newObj interface{}) { - oldCM, ok := oldObj.(*corev1.ConfigMap) - if !ok { - return - } - oldCM = oldCM.DeepCopy() - - newCM, ok := newObj.(*corev1.ConfigMap) - if !ok { - return - } - newCM = newCM.DeepCopy() - - // request settings are changed or not - needUpdated := a.requestSettingsUpdated(oldCM.Data, newCM.Data) - if !needUpdated { - return - } - - // get reqKeyToNames of user request setting from new configmap - reqKeyToNamesFromCM := make(map[string]sets.Set[string]) - for key, setting := range newCM.Data { - if filterName, ok := a.hasFilterName(key); ok { - for _, requestKey := range a.parseRequestSetting(filterName, setting) { - if _, ok := reqKeyToNamesFromCM[requestKey]; !ok { - reqKeyToNamesFromCM[requestKey] = sets.New[string]() - } - reqKeyToNamesFromCM[requestKey].Insert(filterName) - } - } - } - - // update reqKeyToName by merging user setting - a.merge("update", reqKeyToNamesFromCM) -} - -func (a *approver) deleteConfigMap(obj interface{}) { - _, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - - // update reqKeyToName by merging user setting - a.merge("delete", map[string]sets.Set[string]{}) -} - -// merge is used to add specified setting into reqKeyToNames map. -func (a *approver) merge(action string, keyToNamesSetting map[string]sets.Set[string]) { - a.Lock() - defer a.Unlock() - // remove current user setting from reqKeyToNames and left default setting - for key, currentNames := range a.reqKeyToNames { - if defaultNames, ok := a.defaultReqKeyToNames[key]; !ok { - delete(a.reqKeyToNames, key) - } else { - notDefaultNames := currentNames.Difference(defaultNames).UnsortedList() - a.reqKeyToNames[key].Delete(notDefaultNames...) - } - } - - // merge new user setting - for key, names := range keyToNamesSetting { - if _, ok := a.reqKeyToNames[key]; !ok { - a.reqKeyToNames[key] = sets.New[string]() - } - a.reqKeyToNames[key].Insert(names.UnsortedList()...) - } - klog.Infof("current filter setting: %v after %s", a.reqKeyToNames, action) -} - -// parseRequestSetting extract comp info from setting, and make up request keys. -// requestSetting format as following(take servicetopology for example): -// servicetopology: "comp1,comp2" -func (a *approver) parseRequestSetting(name, setting string) []string { - reqKeys := make([]string, 0) - resourceAndVerbs, ok := a.supportedResourceAndVerbsForFilter[name] - if !ok { - return reqKeys - } - - for _, comp := range strings.Split(setting, ",") { + if comp, ok := util.ClientComponentFrom(req.Context()); !ok { + return false, []string{} + } else { if strings.Contains(comp, "/") { - comp = strings.Split(comp, "/")[0] - } - for resource, verbSet := range resourceAndVerbs { - comp = strings.TrimSpace(comp) - resource = strings.TrimSpace(resource) - verbs := verbSet.UnsortedList() - - if len(comp) != 0 && len(resource) != 0 && len(verbs) != 0 { - for i := range verbs { - reqKeys = append(reqKeys, reqKey(comp, resource, strings.TrimSpace(verbs[i]))) - } + index := strings.Index(comp, "/") + if index != -1 { + comp = comp[:index] } } - } - return reqKeys -} - -// hasFilterName check the key that includes a filter name or not. -// and return filter name and check result. -func (a *approver) hasFilterName(key string) (string, bool) { - name := strings.TrimSpace(key) - if strings.HasPrefix(name, "filter_") { - name = strings.TrimSpace(strings.TrimPrefix(name, "filter_")) - } - - if _, ok := a.supportedResourceAndVerbsForFilter[name]; ok { - return name, true - } - - return "", false -} - -// requestSettingsUpdated is used to verify filter setting is changed or not. -func (a *approver) requestSettingsUpdated(old, new map[string]string) bool { - for key := range old { - if _, ok := a.hasFilterName(key); !ok { - delete(old, key) - } - } - - for key := range new { - if _, ok := a.hasFilterName(key); !ok { - delete(new, key) + if a.skipRequestUserAgentList.Has(comp) { + return false, []string{} } } - // if filter setting of old and new equal, return false. - // vice versa, return true. - return !reflect.DeepEqual(old, new) -} - -// getKeyByRequest returns reqKey for specified request. -func getKeyByRequest(req *http.Request) string { - var key string - ctx := req.Context() - comp, ok := util.ClientComponentFrom(ctx) - if !ok { - return key - } - - if strings.Contains(comp, "/") { - index := strings.Index(comp, "/") - if index != -1 { - comp = comp[:index] - } + if ok := cache.WaitForCacheSync(a.stopCh, a.configManager.HasSynced); !ok { + return false, []string{} } - info, ok := apirequest.RequestInfoFrom(ctx) - if !ok { - return key + filterNames := a.configManager.FindFiltersFor(req) + if len(filterNames) == 0 { + return false, filterNames } - return reqKey(comp, info.Resource, info.Verb) -} - -// reqKey is made up by comp and resource, verb -func reqKey(comp, resource, verb string) string { - if len(comp) == 0 || len(resource) == 0 || len(verb) == 0 { - return "" - } - return fmt.Sprintf("%s/%s/%s", comp, resource, verb) + return true, filterNames } diff --git a/pkg/yurthub/filter/approver/approver_test.go b/pkg/yurthub/filter/approver/approver_test.go index ef9473d8b00..0c140d8c12f 100644 --- a/pkg/yurthub/filter/approver/approver_test.go +++ b/pkg/yurthub/filter/approver/approver_test.go @@ -19,40 +19,25 @@ package approver import ( "net/http" "net/http/httptest" - "reflect" - "sort" "testing" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/forwardkubesvctraffic" "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/nodeportisolation" "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" util2 "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -var supportedResourceAndVerbsForFilter = map[string]map[string]sets.Set[string]{ - masterservice.FilterName: { - "services": sets.New[string]("list", "watch"), - }, - discardcloudservice.FilterName: { - "services": sets.New[string]("list", "watch"), - }, - servicetopology.FilterName: { - "endpoints": sets.New[string]("list", "watch"), - "endpointslices": sets.New[string]("list", "watch"), - }, -} - func newTestRequestInfoResolver() *request.RequestInfoFactory { return &request.RequestInfoFactory{ APIPrefixes: sets.NewString("api", "apis"), @@ -90,7 +75,7 @@ func TestApprove(t *testing.T) { verb: "GET", path: "/api/v1/services", approved: true, - resultFilter: []string{discardcloudservice.FilterName}, + resultFilter: []string{discardcloudservice.FilterName, nodeportisolation.FilterName}, workingMode: util2.WorkingModeCloud, }, "kube-proxy watch services": { @@ -98,7 +83,7 @@ func TestApprove(t *testing.T) { verb: "GET", path: "/api/v1/services?watch=true", approved: true, - resultFilter: []string{discardcloudservice.FilterName}, + resultFilter: []string{discardcloudservice.FilterName, nodeportisolation.FilterName}, workingMode: util2.WorkingModeEdge, }, "kube-proxy list endpointslices": { @@ -106,7 +91,7 @@ func TestApprove(t *testing.T) { verb: "GET", path: "/apis/discovery.k8s.io/v1/endpointslices", approved: true, - resultFilter: []string{servicetopology.FilterName}, + resultFilter: []string{servicetopology.FilterName, forwardkubesvctraffic.FilterName}, workingMode: util2.WorkingModeEdge, }, "kube-proxy watch endpointslices": { @@ -114,7 +99,7 @@ func TestApprove(t *testing.T) { verb: "GET", path: "/apis/discovery.k8s.io/v1/endpointslices?watch=true", approved: true, - resultFilter: []string{servicetopology.FilterName}, + resultFilter: []string{servicetopology.FilterName, forwardkubesvctraffic.FilterName}, workingMode: util2.WorkingModeEdge, }, "nginx-ingress-controller list endpoints": { @@ -156,11 +141,21 @@ func TestApprove(t *testing.T) { resultFilter: []string{}, workingMode: util2.WorkingModeCloud, }, + "watch configmaps by unknown agent": { + userAgent: "unknown-agent", + verb: "GET", + path: "/api/v1/configmaps?watch=true", + approved: false, + resultFilter: []string{}, + workingMode: util2.WorkingModeCloud, + }, } + nodeName := "foo" client := &fake.Clientset{} informerFactory := informers.NewSharedInformerFactory(client, 0) - approver := NewApprover(informerFactory, supportedResourceAndVerbsForFilter) + manager := configuration.NewConfigurationManager(nodeName, informerFactory) + approver := NewApprover(nodeName, manager) stopper := make(chan struct{}) defer close(stopper) informerFactory.Start(stopper) @@ -193,503 +188,12 @@ func TestApprove(t *testing.T) { if len(filterNames) != len(tt.resultFilter) { t.Errorf("expect is filter names is %v, but got %v", tt.resultFilter, filterNames) + return } - for i, name := range filterNames { - if tt.resultFilter[i] != name { - t.Errorf("expect is filter names is %v, but got %v", tt.resultFilter, filterNames) - } - } - }) - } -} - -func TestAddConfigMap(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := []struct { - desc string - cm *v1.ConfigMap - resultReqKeyToNames map[string]sets.Set[string] - }{ - { - desc: "add a new filter setting", - cm: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller", - "filter_masterservice": "foo, bar", - }, - }, - resultReqKeyToNames: mergeReqKeyMap(approver.defaultReqKeyToNames, map[string]string{ - "foo/services/list": "masterservice", - "foo/services/watch": "masterservice", - "bar/services/list": "masterservice", - "bar/services/watch": "masterservice", - }), - }, - { - desc: "no filter setting exist", - cm: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller", - }, - }, - resultReqKeyToNames: approver.defaultReqKeyToNames, - }, - } - - for i, tt := range testcases { - t.Run(testcases[i].desc, func(t *testing.T) { - approver.addConfigMap(tt.cm) - if !reflect.DeepEqual(approver.reqKeyToNames, tt.resultReqKeyToNames) { - t.Errorf("expect reqkeyToNames is %#+v, but got %#+v", tt.resultReqKeyToNames, approver.reqKeyToNames) - } - approver.merge("cleanup", map[string]sets.Set[string]{}) - }) - } -} - -func TestUpdateConfigMap(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := []struct { - desc string - oldCM *v1.ConfigMap - newCM *v1.ConfigMap - resultReqKeyToNames map[string]sets.Set[string] - }{ - { - desc: "add a new filter setting", - oldCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller", - "filter_servicetopology": "foo, bar", - }, - }, - newCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller", - "filter_discardcloudservice": "foo, bar", - }, - }, - resultReqKeyToNames: mergeReqKeyMap(approver.defaultReqKeyToNames, map[string]string{ - "foo/services/list": "discardcloudservice", - "foo/services/watch": "discardcloudservice", - "bar/services/list": "discardcloudservice", - "bar/services/watch": "discardcloudservice", - }), - }, - { - desc: "no filter setting changed", - oldCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller", - "filter_servicetopology": "foo, bar", - }, - }, - newCM: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "yurt-hub-cfg", - Namespace: "kube-system", - }, - Data: map[string]string{ - "cache_agents": "nginx-controller, agent2", - "filter_servicetopology": "foo, bar", - }, - }, - resultReqKeyToNames: approver.defaultReqKeyToNames, - }, - } - - for i, tt := range testcases { - t.Run(testcases[i].desc, func(t *testing.T) { - approver.updateConfigMap(tt.oldCM, tt.newCM) - if !reflect.DeepEqual(approver.reqKeyToNames, tt.resultReqKeyToNames) { - t.Errorf("expect reqkeyToName is %#+v, but got %#+v", tt.resultReqKeyToNames, approver.reqKeyToNames) - } - approver.merge("cleanup", map[string]sets.Set[string]{}) - }) - } -} - -func TestMerge(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := map[string]struct { - action string - reqKeyToNamesFromCM map[string]sets.Set[string] - resultReqKeyToNames map[string]sets.Set[string] - }{ - "init req key to name": { - action: "init", - reqKeyToNamesFromCM: map[string]sets.Set[string]{}, - resultReqKeyToNames: approver.defaultReqKeyToNames, - }, - "add some items of req key to name": { - action: "add", - reqKeyToNamesFromCM: map[string]sets.Set[string]{ - "comp1/resources1/list": sets.New[string]("filter1"), - "comp2/resources2/watch": sets.New[string]("filter2"), - "comp3/resources3/watch": sets.New[string]("filter1"), - }, - resultReqKeyToNames: mergeReqKeyMap(approver.defaultReqKeyToNames, map[string]string{ - "comp1/resources1/list": "filter1", - "comp2/resources2/watch": "filter2", - "comp3/resources3/watch": "filter1", - }), - }, - "update and delete item of req key to name": { - action: "update", - reqKeyToNamesFromCM: map[string]sets.Set[string]{ - "comp1/resources1/list": sets.New[string]("filter1"), - "comp2/resources2/watch": sets.New[string]("filter3"), - }, - resultReqKeyToNames: mergeReqKeyMap(approver.defaultReqKeyToNames, map[string]string{ - "comp1/resources1/list": "filter1", - "comp2/resources2/watch": "filter3", - }), - }, - "update default setting of req key to name": { - action: "update", - reqKeyToNamesFromCM: map[string]sets.Set[string]{ - "kubelet/services/list": sets.New("filter1"), - "comp2/resources2/watch": sets.New("filter3"), - }, - resultReqKeyToNames: mergeReqKeyMap(approver.defaultReqKeyToNames, map[string]string{ - "comp2/resources2/watch": "filter3", - "kubelet/services/list": "filter1", - }), - }, - "clear all user setting of req key to name": { - action: "update", - reqKeyToNamesFromCM: map[string]sets.Set[string]{}, - resultReqKeyToNames: approver.defaultReqKeyToNames, - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - approver.merge(tt.action, tt.reqKeyToNamesFromCM) - if !reflect.DeepEqual(approver.reqKeyToNames, tt.resultReqKeyToNames) { - t.Errorf("expect to get reqKeyToName %#+v, but got %#+v", tt.resultReqKeyToNames, approver.reqKeyToNames) - } - }) - } - -} - -func TestParseRequestSetting(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := map[string]struct { - filterName string - filterSetting string - resultKeys []string - }{ - "old normal filter setting has two components": { - filterName: masterservice.FilterName, - filterSetting: "foo/services#list;watch,bar/services#list;watch", - resultKeys: []string{"foo/services/list", "foo/services/watch", "bar/services/list", "bar/services/watch"}, - }, - "normal filter setting has one component": { - filterName: masterservice.FilterName, - filterSetting: "foo", - resultKeys: []string{"foo/services/list", "foo/services/watch"}, - }, - "normal filter setting has two components": { - filterName: masterservice.FilterName, - filterSetting: "foo, bar", - resultKeys: []string{"foo/services/list", "foo/services/watch", "bar/services/list", "bar/services/watch"}, - }, - "invalid filter name": { - filterName: "unknown filter", - filterSetting: "foo", - resultKeys: []string{}, - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - keys := approver.parseRequestSetting(tt.filterName, tt.filterSetting) - sort.Strings(keys) - sort.Strings(tt.resultKeys) - - if !reflect.DeepEqual(keys, tt.resultKeys) { - t.Errorf("expect request keys %#+v, but got %#+v", tt.resultKeys, keys) - } - }) - } -} - -func TestHasFilterName(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := map[string]struct { - key string - expectFilterName string - isFilter bool - }{ - "it's not filter": { - key: "cache_agents", - expectFilterName: "", - isFilter: false, - }, - "it's a filter": { - key: "filter_masterservice", - expectFilterName: "masterservice", - isFilter: true, - }, - "only has filter prefix": { - key: "filter_", - expectFilterName: "", - isFilter: false, - }, - "it's a servicetopology filter": { - key: "servicetopology", - expectFilterName: "servicetopology", - isFilter: true, - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - name, ok := approver.hasFilterName(tt.key) - if name != tt.expectFilterName { - t.Errorf("expect filter name is %s, but got %s", tt.expectFilterName, name) - } - - if ok != tt.isFilter { - t.Errorf("expect has filter bool is %v, but got %v", tt.isFilter, ok) - } - }) - } -} - -func TestRequestSettingsUpdated(t *testing.T) { - approver := newApprover(supportedResourceAndVerbsForFilter) - testcases := map[string]struct { - old map[string]string - new map[string]string - result bool - }{ - "filter setting is not changed": { - old: map[string]string{ - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "", - "filter_masterservice": "", - }, - new: map[string]string{ - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "", - "filter_masterservice": "", - }, - result: false, - }, - "non-filter setting is changed": { - old: map[string]string{ - "cache_agents": "foo", - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "", - "filter_masterservice": "", - }, - new: map[string]string{ - "cache_agents": "bar", - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "", - "filter_masterservice": "", - }, - result: false, - }, - "filter setting is changed": { - old: map[string]string{ - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "", - "filter_masterservice": "", - }, - new: map[string]string{ - "filter_endpoints": "coredns/endpoints#list;watch", - "filter_servicetopology": "coredns/endpointslices#list;watch", - "filter_discardcloudservice": "coredns/services#list;watch", - "filter_masterservice": "", - }, - result: true, - }, - "no prefix filter setting is changed": { - old: map[string]string{ - "servicetopology": "coredns", - "discardcloudservice": "", - "masterservice": "", - }, - new: map[string]string{ - "servicetopology": "coredns", - "discardcloudservice": "coredns", - "masterservice": "", - }, - result: true, - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - needUpdated := approver.requestSettingsUpdated(tt.old, tt.new) - if needUpdated != tt.result { - t.Errorf("expect need updated is %v, but got %v", tt.result, needUpdated) - } - }) - } -} - -func TestGetKeyByRequest(t *testing.T) { - testcases := map[string]struct { - userAgent string - path string - resultKey string - }{ - "list pods by kubelet": { - userAgent: "kubelet", - path: "/api/v1/pods", - resultKey: "kubelet/pods/list", - }, - "list nodes by flanneld": { - userAgent: "flanneld/v1.2", - path: "/api/v1/nodes", - resultKey: "flanneld/nodes/list", - }, - "list nodes without component": { - path: "/api/v1/nodes", - resultKey: "", - }, - "list nodes with empty component": { - userAgent: "", - path: "/api/v1/nodes", - resultKey: "", - }, - } - - resolver := newTestRequestInfoResolver() - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - req, err := http.NewRequest("GET", tt.path, nil) - if err != nil { - t.Errorf("failed to create request, %v", err) - } - req.RemoteAddr = "127.0.0.1" - - if len(tt.userAgent) != 0 { - req.Header.Set("User-Agent", tt.userAgent) - } - - var requestKey string - var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - requestKey = getKeyByRequest(req) - }) - - handler = util.WithRequestClientComponent(handler, util2.WorkingModeEdge) - handler = filters.WithRequestInfo(handler, resolver) - handler.ServeHTTP(httptest.NewRecorder(), req) - - if requestKey != tt.resultKey { - t.Errorf("expect req key is %s, but got %s", tt.resultKey, requestKey) - } - }) - } -} - -func TestReqKey(t *testing.T) { - testcases := map[string]struct { - comp string - resource string - verb string - result string - }{ - "comp is empty": { - resource: "service", - verb: "get", - result: "", - }, - "resource is empty": { - comp: "kubelet", - verb: "get", - result: "", - }, - "verb is empty": { - comp: "kubelet", - resource: "pod", - result: "", - }, - "normal request": { - comp: "kubelet", - resource: "pod", - verb: "get", - result: "kubelet/pod/get", - }, - } - - for k, tt := range testcases { - t.Run(k, func(t *testing.T) { - key := reqKey(tt.comp, tt.resource, tt.verb) - if key != tt.result { - t.Errorf("expect req key %s, but got %s", tt.result, key) + if !sets.New[string](filterNames...).Equal(sets.New[string](tt.resultFilter...)) { + t.Errorf("expect is filter names is %v, but got %v", tt.resultFilter, filterNames) } }) } } - -func mergeReqKeyMap(base map[string]sets.Set[string], m map[string]string) map[string]sets.Set[string] { - reqKeyToNames := make(map[string]sets.Set[string]) - for k, v := range base { - reqKeyToNames[k] = sets.New[string](v.UnsortedList()...) - } - - for k, v := range m { - if _, ok := reqKeyToNames[k]; ok { - reqKeyToNames[k].Insert(v) - } else { - reqKeyToNames[k] = sets.New[string](v) - } - } - - return reqKeyToNames -} - -func newApprover(filterSupportedResAndVerbs map[string]map[string]sets.Set[string]) *approver { - na := &approver{ - reqKeyToNames: make(map[string]sets.Set[string]), - supportedResourceAndVerbsForFilter: filterSupportedResAndVerbs, - stopCh: make(chan struct{}), - } - - defaultReqKeyToFilterNames := make(map[string]sets.Set[string]) - for name, setting := range options.SupportedComponentsForFilter { - for _, key := range na.parseRequestSetting(name, setting) { - if _, ok := defaultReqKeyToFilterNames[key]; !ok { - defaultReqKeyToFilterNames[key] = sets.New[string]() - } - defaultReqKeyToFilterNames[key].Insert(name) - } - } - na.defaultReqKeyToNames = defaultReqKeyToFilterNames - - na.merge("init", na.defaultReqKeyToNames) - return na -} diff --git a/pkg/yurthub/filter/base/base.go b/pkg/yurthub/filter/base/base.go index 5fa4b99dfb2..56a9b2fc259 100644 --- a/pkg/yurthub/filter/base/base.go +++ b/pkg/yurthub/filter/base/base.go @@ -43,8 +43,8 @@ func NewFilters(disabledFilters []string) *Filters { } } -func (fs *Filters) NewFromFilters(initializer filter.Initializer) ([]filter.ObjectFilter, error) { - var filters = make([]filter.ObjectFilter, 0) +func (fs *Filters) NewFromFilters(initializer filter.Initializer) (map[string]filter.ObjectFilter, error) { + var filters = make(map[string]filter.ObjectFilter) for _, name := range fs.names { if fs.Enabled(name) { factory, found := fs.registry[name] @@ -63,7 +63,7 @@ func (fs *Filters) NewFromFilters(initializer filter.Initializer) ([]filter.Obje return nil, err } klog.V(2).Infof("filter %s initialize successfully", name) - filters = append(filters, ins) + filters[name] = ins } else { klog.V(2).Infof("filter %s is disabled", name) } diff --git a/pkg/yurthub/filter/discardcloudservice/filter.go b/pkg/yurthub/filter/discardcloudservice/filter.go index ac41cea303d..fc75377496f 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter.go +++ b/pkg/yurthub/filter/discardcloudservice/filter.go @@ -21,7 +21,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" @@ -62,12 +61,6 @@ func (sf *discardCloudServiceFilter) Name() string { return FilterName } -func (sf *discardCloudServiceFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "services": sets.New("list", "watch"), - } -} - func (sf *discardCloudServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { switch v := obj.(type) { case *v1.Service: diff --git a/pkg/yurthub/filter/discardcloudservice/filter_test.go b/pkg/yurthub/filter/discardcloudservice/filter_test.go index 40959fd736b..38bccf891b5 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter_test.go +++ b/pkg/yurthub/filter/discardcloudservice/filter_test.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -44,24 +43,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - dcsf, _ := NewDiscardCloudServiceFilter() - rvs := dcsf.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "services" { - t.Errorf("expect resource is services, but got %s", resource) - } - - if !verbs.Equal(sets.New("list", "watch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestFilter(t *testing.T) { testcases := map[string]struct { responseObj runtime.Object diff --git a/pkg/yurthub/filter/forwardkubesvctraffic/filter.go b/pkg/yurthub/filter/forwardkubesvctraffic/filter.go index f63d9c5feff..8120493176e 100644 --- a/pkg/yurthub/filter/forwardkubesvctraffic/filter.go +++ b/pkg/yurthub/filter/forwardkubesvctraffic/filter.go @@ -21,7 +21,6 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -60,12 +59,6 @@ func (fkst *forwardKubeSVCTrafficFilter) Name() string { return FilterName } -func (fkst *forwardKubeSVCTrafficFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "endpointslices": sets.New("list", "watch"), - } -} - func (fkst *forwardKubeSVCTrafficFilter) SetMasterServiceHost(host string) error { fkst.host = host if utilnet.IsIPv6String(host) { diff --git a/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go b/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go index 70d85bcc626..3b2c5e8b47a 100644 --- a/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go +++ b/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go @@ -25,7 +25,6 @@ import ( discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" "github.com/openyurtio/openyurt/pkg/util" @@ -47,24 +46,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - fkst, _ := NewForwardKubeSVCTrafficFilter() - rvs := fkst.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "endpointslices" { - t.Errorf("expect resource is endpointslices, but got %s", resource) - } - - if !verbs.Equal(sets.New("list", "watch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestFilter(t *testing.T) { portName := "https" diff --git a/pkg/yurthub/filter/inclusterconfig/filter.go b/pkg/yurthub/filter/inclusterconfig/filter.go index 8fb1f426b32..5396964aa55 100644 --- a/pkg/yurthub/filter/inclusterconfig/filter.go +++ b/pkg/yurthub/filter/inclusterconfig/filter.go @@ -22,7 +22,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" @@ -57,12 +56,6 @@ func (iccf *inClusterConfigFilter) Name() string { return FilterName } -func (iccf *inClusterConfigFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "configmaps": sets.New("get", "list", "watch"), - } -} - func (iccf *inClusterConfigFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { switch v := obj.(type) { case *v1.ConfigMap: diff --git a/pkg/yurthub/filter/inclusterconfig/filter_test.go b/pkg/yurthub/filter/inclusterconfig/filter_test.go index 577bfd2ea97..4f7a2d7e00c 100644 --- a/pkg/yurthub/filter/inclusterconfig/filter_test.go +++ b/pkg/yurthub/filter/inclusterconfig/filter_test.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -44,24 +43,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - iccf, _ := NewInClusterConfigFilter() - rvs := iccf.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "configmaps" { - t.Errorf("expect resource is services, but got %s", resource) - } - - if !verbs.Equal(sets.New("get", "list", "watch")) { - t.Errorf("expect verbs are get/list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestRuntimeObjectFilter(t *testing.T) { iccf, _ := NewInClusterConfigFilter() diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index 6da552a6281..e33b0e0a3c2 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -22,7 +22,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" ) type NodesInPoolGetter func(poolName string) ([]string, error) @@ -51,9 +50,6 @@ type ResponseFilter interface { // Every Filter need to implement ObjectFilter interface. type ObjectFilter interface { Name() string - // SupportedResourceAndVerbs is used to specify which resource and request verb is supported by the filter. - // Because each filter can make sure what requests with resource and verb can be handled. - SupportedResourceAndVerbs() map[string]sets.Set[string] // Filter is used for filtering runtime object // all filter logic should be located in it. Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index 1469d7346df..7e0fdb4edd1 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -20,12 +20,12 @@ import ( "net/http" "strconv" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" yurtoptions "github.com/openyurtio/openyurt/cmd/yurthub/app/options" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/approver" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -46,7 +46,8 @@ func NewFilterManager(options *yurtoptions.YurtHubOptions, sharedFactory informers.SharedInformerFactory, dynamicSharedFactory dynamicinformer.DynamicSharedInformerFactory, proxiedClient kubernetes.Interface, - serializerManager *serializer.SerializerManager) (filter.FilterFinder, error) { + serializerManager *serializer.SerializerManager, + configManager *configuration.Manager) (filter.FilterFinder, error) { if !options.EnableResourceFilter { return nil, nil } @@ -72,25 +73,17 @@ func NewFilterManager(options *yurtoptions.YurtHubOptions, initializerChain = append(initializerChain, genericInitializer, nodesInitializer) // 4. initialize all object filters - objFilters, err := filters.NewFromFilters(initializerChain) + nameToFilters, err := filters.NewFromFilters(initializerChain) if err != nil { return nil, err } // 5. new filter manager including approver and nameToObjectFilter - m := &Manager{ - nameToObjectFilter: make(map[string]filter.ObjectFilter), + return &Manager{ + Approver: approver.NewApprover(options.NodeName, configManager), + nameToObjectFilter: nameToFilters, serializerManager: serializerManager, - } - - filterSupportedResAndVerbs := make(map[string]map[string]sets.Set[string]) - for i := range objFilters { - m.nameToObjectFilter[objFilters[i].Name()] = objFilters[i] - filterSupportedResAndVerbs[objFilters[i].Name()] = objFilters[i].SupportedResourceAndVerbs() - } - m.Approver = approver.NewApprover(sharedFactory, filterSupportedResAndVerbs) - - return m, nil + }, nil } func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { diff --git a/pkg/yurthub/filter/manager/manager_test.go b/pkg/yurthub/filter/manager/manager_test.go index f67d7e73117..6169b3254a9 100644 --- a/pkg/yurthub/filter/manager/manager_test.go +++ b/pkg/yurthub/filter/manager/manager_test.go @@ -34,6 +34,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/apis" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" @@ -128,10 +129,11 @@ func TestFindResponseFilter(t *testing.T) { sharedFactory, nodePoolFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, 24*time.Hour) + configManager := configuration.NewConfigurationManager(options.NodeName, sharedFactory) stopper := make(chan struct{}) defer close(stopper) - finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager) + finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager, configManager) if tt.mgrIsNil && finder == nil { return } @@ -259,10 +261,11 @@ func TestFindObjectFilter(t *testing.T) { sharedFactory, nodePoolFactory := informers.NewSharedInformerFactory(fakeClient, 24*time.Hour), dynamicinformer.NewDynamicSharedInformerFactory(fakeDynamicClient, 24*time.Hour) + configManager := configuration.NewConfigurationManager(options.NodeName, sharedFactory) stopper := make(chan struct{}) defer close(stopper) - finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager) + finder, _ := NewFilterManager(options, sharedFactory, nodePoolFactory, fakeClient, serializerManager, configManager) if tt.mgrIsNil && finder == nil { return } diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index 654719f5d3c..4753ddb03e4 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -21,7 +21,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/filter" @@ -58,12 +57,6 @@ func (msf *masterServiceFilter) Name() string { return FilterName } -func (msf *masterServiceFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "services": sets.New("list", "watch"), - } -} - func (msf *masterServiceFilter) SetMasterServiceHost(host string) error { msf.host = host return nil diff --git a/pkg/yurthub/filter/masterservice/filter_test.go b/pkg/yurthub/filter/masterservice/filter_test.go index 2fea48385f3..a7c8bf5e9d0 100644 --- a/pkg/yurthub/filter/masterservice/filter_test.go +++ b/pkg/yurthub/filter/masterservice/filter_test.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -44,24 +43,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - msf, _ := NewMasterServiceFilter() - rvs := msf.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "services" { - t.Errorf("expect resource is services, but got %s", resource) - } - - if !verbs.Equal(sets.New("list", "watch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestFilter(t *testing.T) { masterServiceHost := "169.251.2.1" var masterServicePort int32 diff --git a/pkg/yurthub/filter/nodeportisolation/filter.go b/pkg/yurthub/filter/nodeportisolation/filter.go index 21aee4633e0..54b73f3df2f 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter.go +++ b/pkg/yurthub/filter/nodeportisolation/filter.go @@ -62,12 +62,6 @@ func (nif *nodePortIsolationFilter) Name() string { return FilterName } -func (nif *nodePortIsolationFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "services": sets.New("list", "watch"), - } -} - func (nif *nodePortIsolationFilter) SetNodePoolName(name string) error { nif.nodePoolName = name return nil diff --git a/pkg/yurthub/filter/nodeportisolation/filter_test.go b/pkg/yurthub/filter/nodeportisolation/filter_test.go index 43d87f941bc..cb574d856ac 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter_test.go +++ b/pkg/yurthub/filter/nodeportisolation/filter_test.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/projectinfo" @@ -46,24 +45,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - nif, _ := NewNodePortIsolationFilter() - rvs := nif.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "services" { - t.Errorf("expect resource is services, but got %s", resource) - } - - if !verbs.Equal(sets.New("list", "watch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestSetNodePoolName(t *testing.T) { nif := &nodePortIsolationFilter{} if err := nif.SetNodePoolName("nodepool1"); err != nil { diff --git a/pkg/yurthub/filter/responsefilter/filter.go b/pkg/yurthub/filter/responsefilter/filter.go index 87c24401c10..70a5a50cb8d 100644 --- a/pkg/yurthub/filter/responsefilter/filter.go +++ b/pkg/yurthub/filter/responsefilter/filter.go @@ -94,24 +94,21 @@ func newFilterReadCloser( // Read get data into p and write into pipe func (frc *filterReadCloser) Read(p []byte) (int, error) { - var ok bool - if frc.isWatch { - if frc.filterCache.Len() != 0 { - return frc.filterCache.Read(p) - } else { - frc.filterCache.Reset() - } - - select { - case frc.filterCache, ok = <-frc.watchDataCh: - if !ok { - return 0, io.EOF - } - return frc.filterCache.Read(p) - } - } else { + // direct read if not watching or if cache has data + if !frc.isWatch || frc.filterCache.Len() != 0 { return frc.filterCache.Read(p) } + + // frc.isWatch is true and cache is empty + frc.filterCache.Reset() + + var ok bool + if frc.filterCache, ok = <-frc.watchDataCh; !ok { + return 0, io.EOF + } + + // read from the filterCache after receiving new data + return frc.filterCache.Read(p) } // Close will close readers diff --git a/pkg/yurthub/filter/responsefilter/filter_test.go b/pkg/yurthub/filter/responsefilter/filter_test.go index be3e0f8c24a..50d46e7d099 100644 --- a/pkg/yurthub/filter/responsefilter/filter_test.go +++ b/pkg/yurthub/filter/responsefilter/filter_test.go @@ -422,12 +422,10 @@ func TestResponseFilterForListRequest(t *testing.T) { poolName := "foo" masterHost := "169.254.2.1" masterPort := "10268" - var masterPortInt int32 - masterPortInt = 10268 + masterPortInt := int32(10268) readyCondition := true portName := "https" - var kasPort int32 - kasPort = 443 + kasPort := int32(443) scheme := runtime.NewScheme() apis.AddToScheme(scheme) nodeBucketGVRToListKind := map[schema.GroupVersionResource]string{ @@ -2623,10 +2621,14 @@ func TestResponseFilterForListRequest(t *testing.T) { baseFilters := base.NewFilters([]string{}) options.RegisterAllFilters(baseFilters) - objectFilters, err := baseFilters.NewFromFilters(initializerChain) + nameToFilter, err := baseFilters.NewFromFilters(initializerChain) if err != nil { t.Errorf("couldn't new object filters, %v", err) } + objectFilters := make([]filter.ObjectFilter, 0, len(nameToFilter)) + for _, objFilter := range nameToFilter { + objectFilters = append(objectFilters, objFilter) + } s := serializerManager.CreateSerializer(tc.accept, tc.group, tc.version, tc.resource) encoder, err := s.Encoder(tc.accept, nil) diff --git a/pkg/yurthub/filter/serviceenvupdater/filter.go b/pkg/yurthub/filter/serviceenvupdater/filter.go index 6ca73517b9a..b501c9de522 100644 --- a/pkg/yurthub/filter/serviceenvupdater/filter.go +++ b/pkg/yurthub/filter/serviceenvupdater/filter.go @@ -19,7 +19,6 @@ package serviceenvupdater import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -54,12 +53,6 @@ func (sef *serviceEnvUpdaterFilter) Name() string { return FilterName } -func (sef *serviceEnvUpdaterFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "pods": sets.New("list", "watch", "get", "patch"), - } -} - func (sef *serviceEnvUpdaterFilter) SetMasterServiceHost(host string) error { sef.host = host return nil diff --git a/pkg/yurthub/filter/serviceenvupdater/filter_test.go b/pkg/yurthub/filter/serviceenvupdater/filter_test.go index 72c80d189a3..092c33a630a 100644 --- a/pkg/yurthub/filter/serviceenvupdater/filter_test.go +++ b/pkg/yurthub/filter/serviceenvupdater/filter_test.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/openyurtio/openyurt/pkg/util" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" @@ -49,22 +48,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - nif, _ := NewServiceEnvUpdaterFilter() - rvs := nif.SupportedResourceAndVerbs() - if len(rvs) != 1 { - t.Errorf("supported more than one resources, %v", rvs) - } - for resource, verbs := range rvs { - if resource != "pods" { - t.Errorf("expect resource is pods, but got %s", resource) - } - if !verbs.Equal(sets.New("list", "watch", "get", "patch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestFilterServiceEnvUpdater(t *testing.T) { tests := []struct { name string diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index ac32e9c85f6..7b2eb1eb5ec 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -24,7 +24,6 @@ import ( discoveryV1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" listers "k8s.io/client-go/listers/core/v1" @@ -73,13 +72,6 @@ func (stf *serviceTopologyFilter) Name() string { return FilterName } -func (stf *serviceTopologyFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { - return map[string]sets.Set[string]{ - "endpoints": sets.New("list", "watch"), - "endpointslices": sets.New("list", "watch"), - } -} - func (stf *serviceTopologyFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { stf.serviceLister = factory.Core().V1().Services().Lister() stf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced diff --git a/pkg/yurthub/filter/servicetopology/filter_test.go b/pkg/yurthub/filter/servicetopology/filter_test.go index 4461113d1d5..5c1f654911b 100644 --- a/pkg/yurthub/filter/servicetopology/filter_test.go +++ b/pkg/yurthub/filter/servicetopology/filter_test.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/informers" @@ -61,24 +60,6 @@ func TestName(t *testing.T) { } } -func TestSupportedResourceAndVerbs(t *testing.T) { - stf, _ := NewServiceTopologyFilter() - rvs := stf.SupportedResourceAndVerbs() - if len(rvs) != 2 { - t.Errorf("supported not two resources, %v", rvs) - } - - for resource, verbs := range rvs { - if resource != "endpoints" && resource != "endpointslices" { - t.Errorf("expect resource is endpoints/endpointslices, but got %s", resource) - } - - if !verbs.Equal(sets.New("list", "watch")) { - t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) - } - } -} - func TestFilter(t *testing.T) { scheme := runtime.NewScheme() apis.AddToScheme(scheme) diff --git a/pkg/yurthub/proxy/autonomy/autonomy_test.go b/pkg/yurthub/proxy/autonomy/autonomy_test.go index b4741b3437c..f0b88d8184d 100644 --- a/pkg/yurthub/proxy/autonomy/autonomy_test.go +++ b/pkg/yurthub/proxy/autonomy/autonomy_test.go @@ -29,6 +29,7 @@ import ( appsv1beta1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/storage" @@ -37,9 +38,7 @@ import ( ) var ( - rootDir = "/tmp/cache-local" - fakeClient = fake.NewSimpleClientset() - fakeSharedInformerFactory = informers.NewSharedInformerFactory(fakeClient, 0) + rootDir = "/tmp/cache-local" ) func TestHttpServeKubeletGetNode(t *testing.T) { @@ -49,7 +48,9 @@ func TestHttpServeKubeletGetNode(t *testing.T) { } storageWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", storageWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(storageWrapper, serializerM, nil, configManager) autonomyProxy := NewAutonomyProxy(nil, cacheM) diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index 96208acc4d5..e4668733b24 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" @@ -46,9 +47,7 @@ import ( ) var ( - rootDir = "/tmp/cache-local" - fakeClient = fake.NewSimpleClientset() - fakeSharedInformerFactory = informers.NewSharedInformerFactory(fakeClient, 0) + rootDir = "/tmp/cache-local" ) func newTestRequestInfoResolver() *request.RequestInfoFactory { @@ -65,7 +64,9 @@ func TestServeHTTPForWatch(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) fn := func() bool { return false @@ -157,7 +158,9 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) cnt := 0 fn := func() bool { @@ -242,7 +245,9 @@ func TestServeHTTPForWatchWithMinRequestTimeout(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) fn := func() bool { return false @@ -334,7 +339,9 @@ func TestServeHTTPForPost(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) fn := func() bool { return false @@ -414,7 +421,9 @@ func TestServeHTTPForDelete(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) fn := func() bool { return false @@ -481,7 +490,9 @@ func TestServeHTTPForGetReqCache(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, configManager) fn := func() bool { return false @@ -630,11 +641,14 @@ func TestServeHTTPForListReqCache(t *testing.T) { dStorage, err := disk.NewDiskStorage(rootDir) if err != nil { t.Errorf("failed to create disk storage, %v", err) + return } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() restRESTMapperMgr, _ := hubmeta.NewRESTMapperManager(rootDir) - cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + fakeSharedInformerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + configManager := configuration.NewConfigurationManager("node1", fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, configManager) fn := func() bool { return false diff --git a/pkg/yurthub/yurtcoordinator/coordinator.go b/pkg/yurthub/yurtcoordinator/coordinator.go index b502ff4768f..b9784fa77ab 100644 --- a/pkg/yurthub/yurtcoordinator/coordinator.go +++ b/pkg/yurthub/yurtcoordinator/coordinator.go @@ -43,6 +43,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/configuration" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" yurtrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" @@ -122,6 +123,7 @@ type coordinator struct { // node lease contains DelegateHeartBeat label, it will trigger the eventhandler which will // use cloud client to send it to cloud APIServer. delegateNodeLeaseManager *coordinatorLeaseInformerManager + configManager *configuration.Manager } func NewCoordinator( @@ -164,6 +166,7 @@ func NewCoordinator( restMapperMgr: cfg.RESTMapperManager, hubElector: elector, statusInfoChan: make(chan statusInfo, 10), + configManager: cfg.ConfigManager, } poolCacheSyncedDetector := &poolCacheSyncedDetector{ @@ -404,11 +407,10 @@ func (coordinator *coordinator) buildPoolCacheStore() (cachemanager.CacheManager } poolCacheManager := cachemanager.NewCacheManager( - "", cachemanager.NewStorageWrapper(etcdStore), coordinator.serializerMgr, coordinator.restMapperMgr, - coordinator.informerFactory, + coordinator.configManager, ) return poolCacheManager, etcdStore, cancel, nil }