Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Jan 2, 2025
1 parent eed8a55 commit 59483aa
Show file tree
Hide file tree
Showing 40 changed files with 2,176 additions and 171 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var AllowedMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
RequestMultiplexerManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: AllowedMultiplexerResources,
RequestMultiplexerManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
}
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)

Check warning on line 139 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L139

Added line #L139 was not covered by tests
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
Binary file added cmd/yurthub/yurthub
Binary file not shown.
14 changes: 8 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ const (

type CacheAgent struct {
sync.Mutex
agents sets.Set[string]
store StorageWrapper
agents sets.Set[string]
store StorageWrapper
nodeName string
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
store: store,
agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName),
store: store,
nodeName: nodeName,
}
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -108,7 +110,7 @@ func (ca *CacheAgent) deleteConfigmap(obj interface{}) {

// updateCacheAgents update cache agents
func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] {
newAgents := sets.New(util.DefaultCacheAgents...)
newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName)
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
if len(agent) != 0 {
Expand Down
13 changes: 7 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,39 @@ func TestUpdateCacheAgents(t *testing.T) {
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.Set[string]{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"no agents updated with default": {
initAgents: []string{"agent1", "agent2", "kubelet"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"empty agents added ": {
initAgents: []string{},
cacheAgents: "",
resultAgents: sets.New(util.DefaultCacheAgents...),
resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
m := &CacheAgent{
agents: sets.New(tt.initAgents...),
agents: sets.New(tt.initAgents...),
nodeName: "iz2ze21g5pq9jbesubrksvz",
}

m.updateCacheAgents(strings.Join(tt.initAgents, ","), "")
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
nodeName string,
storagewrapper StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
) CacheManager {
cacheAgents := NewCacheAgents(sharedFactory, storagewrapper)
cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper)
cm := &cacheManager{
storage: storagewrapper,
serializerManager: serializerMgr,
Expand Down
12 changes: 6 additions & 6 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj []watch.Event
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo storage.KeyBuildInfo
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo *storage.KeyBuildInfo
Expand Down Expand Up @@ -3158,7 +3158,7 @@ func TestCanCacheFor(t *testing.T) {
defer close(stop)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
m := NewCacheManager(s, nil, nil, informerFactory)
m := NewCacheManager("node1", s, nil, nil, informerFactory)
informerFactory.Start(nil)
cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced)
if tt.preRequest != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/yurthub/filter/filterchain/filterchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filterchain

import (
"strings"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
)

type FilterChain []filter.ObjectFilter

func (chain FilterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())

Check warning on line 34 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L31-L34

Added lines #L31 - L34 were not covered by tests
}
return strings.Join(names, ",")

Check warning on line 36 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L36

Added line #L36 was not covered by tests
}

func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {

Check warning on line 39 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L39

Added line #L39 was not covered by tests
// do nothing
return map[string]sets.Set[string]{}

Check warning on line 41 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L41

Added line #L41 was not covered by tests
}

func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break

Check warning on line 48 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L44-L48

Added lines #L44 - L48 were not covered by tests
}
}

return obj

Check warning on line 52 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L52

Added line #L52 was not covered by tests
}
5 changes: 5 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterFinder interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)
18 changes: 18 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/approver"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand Down Expand Up @@ -111,3 +112,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil

Check warning on line 120 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L116-L120

Added lines #L116 - L120 were not covered by tests
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)

Check warning on line 125 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L123-L125

Added lines #L123 - L125 were not covered by tests
}
}

filters := filterchain.FilterChain(objectFilters)
return filters

Check warning on line 130 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
Loading

0 comments on commit 59483aa

Please sign in to comment.