Skip to content

Commit

Permalink
dynamic Router and Cluster from registry center (#632)
Browse files Browse the repository at this point in the history
* dynamic Router and Cluster from registry center

* change cluster identity to ApplicationName+Interface+Method+Version+Group
change path to /ApplicationName/Interface/Method

* add RegisteredType to registry

* fmt

* add check for registry type

* remove envoy

---------

Co-authored-by: lorry <[email protected]>
Co-authored-by: mark4z <[email protected]>
  • Loading branch information
3 people authored Nov 3, 2024
1 parent e55f67e commit a899e49
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 99 deletions.
14 changes: 1 addition & 13 deletions before_ut.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,4 @@ zkJar="${zkJarPath}/${zkJarName}"
if [ ! -f "${zkJar}" ]; then
mkdir -p ${zkJarPath}
wget -P "${zkJarPath}" ${remoteJarUrl}
fi

# download envoy
envoyPath="out/linux_amd64/"
sudo apt update
sudo apt install -y apt-transport-https gnupg2 curl lsb-release
curl -sL 'https://deb.dl.getenvoy.io/public/gpg.8115BA8E629CC074.key' | sudo gpg --dearmor --yes -o /usr/share/keyrings/getenvoy-keyring.gpg
echo a077cb587a1b622e03aa4bf2f3689de14658a9497a9af2c427bba5f4cc3c4723 /usr/share/keyrings/getenvoy-keyring.gpg | sha256sum --check
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/getenvoy-keyring.gpg] https://deb.dl.getenvoy.io/public/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/getenvoy.list
sudo apt update
sudo apt install -y getenvoy-envoy
mkdir -p ${envoyPath}
cp /usr/bin/envoy ${envoyPath}/envoy
fi
4 changes: 3 additions & 1 deletion pkg/adapter/dubboregistry/registry/base/baseregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ func (s *SvcListeners) GetAllListener() map[string]registry.Listener {
}

type BaseRegistry struct {
RegisteredType registry.RegisteredType
svcListeners *SvcListeners
facadeRegistry FacadeRegistry
AdapterListener common.RegistryEventListener
}

func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener) *BaseRegistry {
func NewBaseRegistry(facade FacadeRegistry, adapterListener common.RegistryEventListener, registerType registry.RegisteredType) *BaseRegistry {
return &BaseRegistry{
RegisteredType: registerType,
facadeRegistry: facade,
svcListeners: &SvcListeners{
listeners: make(map[string]registry.Listener),
Expand Down
17 changes: 11 additions & 6 deletions pkg/adapter/dubboregistry/registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type NacosRegistry struct {
}

func (n *NacosRegistry) DoSubscribe() error {
intfListener, ok := n.nacosListeners[registry.RegisteredTypeInterface]
intfListener, ok := n.nacosListeners[n.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand Down Expand Up @@ -91,9 +91,14 @@ func newNacosRegistry(regConfig model.Registry, adapterListener common.RegistryE
client: client,
nacosListeners: make(map[registry.RegisteredType]registry.Listener),
}
nacosRegistry.nacosListeners[registry.RegisteredTypeInterface] = newNacosIntfListener(client, nacosRegistry, &regConfig, adapterListener)

baseReg := baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener)
nacosRegistry.BaseRegistry = baseReg
return baseReg, nil
nacosRegistry.BaseRegistry = baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType))
switch nacosRegistry.RegisteredType {
case registry.RegisteredTypeInterface:
nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newNacosIntfListener(client, nacosRegistry, &regConfig, adapterListener)
//case registry.RegisteredTypeApplication:
//nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
default:
return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType)
}
return nacosRegistry, nil
}
24 changes: 18 additions & 6 deletions pkg/adapter/dubboregistry/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,31 @@ import (

type RegisteredType int8

var RegisteredTypes = []string{"application", "interface"}

const (
RegisteredTypeApplication RegisteredType = iota
RegisteredTypeInterface

RegisteredTypeApplicationName = "application"
RegisteredTypeInterfaceName = "interface"
)

var registryMap = make(map[string]func(model.Registry, common2.RegistryEventListener) (Registry, error), 8)

func (t *RegisteredType) String() string {
return []string{"application", "interface"}[*t]
return RegisteredTypes[*t]
}

func RegisterTypeFromName(name string) RegisteredType {
switch name {
case RegisteredTypeApplicationName:
return RegisteredTypeApplication
case RegisteredTypeInterfaceName:
return RegisteredTypeInterface
default:
return RegisteredTypeInterface
}
}

// Registry interface defines the basic features of a registry
Expand Down Expand Up @@ -134,11 +150,7 @@ func ParseDubboString(urlString string) (config.DubboBackendConfig, []string, st

// GetAPIPattern generate the API path pattern. /application/interface/version
func GetAPIPattern(bkConfig config.DubboBackendConfig) string {
if bkConfig.Version == "" {
// if the version is empty, make sure the url path is valid.
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash)
}
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface, bkConfig.Version}, constant.PathSlash)
return strings.Join([]string{"/" + bkConfig.ApplicationName, bkConfig.Interface}, constant.PathSlash)
}

func GetRouter() model.Router {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import (

import (
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
ex "dubbo.apache.org/dubbo-go/v3/common/extension"
dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metadata/definition"
dr "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
"github.com/dubbogo/go-zookeeper/zk"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"

"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"

"github.com/dubbogo/go-zookeeper/zk"
)

var _ registry.Listener = new(applicationServiceListener)
Expand Down Expand Up @@ -154,8 +155,7 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
}
methods, err := asl.getMethods(bkConfig.Interface)
if err != nil {
logger.Warnf("Get methods of interface %s failed; due to %s", bkConfig.Interface, err.Error())
continue
logger.Warnf("Get methods of interface %s failed; use prefix pattern to match url, due to %s", bkConfig.Interface, err.Error())
}

apiPattern := registry.GetAPIPattern(bkConfig)
Expand All @@ -169,8 +169,16 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
MapTo: "opt.types",
},
}
for i := range methods {
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams)
if methods != nil && len(methods) != 0 {
for i := range methods {
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams)
if err := asl.adapterListener.OnAddAPI(api); err != nil {
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
}
}
} else {
// can't fetch methods, use http prefix pattern
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, constant.AnyValue, mappingParams)
if err := asl.adapterListener.OnAddAPI(api); err != nil {
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
}
Expand All @@ -197,26 +205,16 @@ func (asl *applicationServiceListener) getUrls(path string) []*dubboCommon.URL {
instance := toZookeeperInstance(iss)

metaData := instance.GetMetadata()
metadataStorageType, ok := metaData[constant.MetadataStorageTypeKey]
if !ok {
metadataStorageType = constant.DefaultMetadataStorageType
}
// get metadata service proxy factory according to the metadataStorageType
proxyFactory := ex.GetMetadataServiceProxyFactory(metadataStorageType)
if proxyFactory == nil {
return nil
}
metadataService := proxyFactory.GetProxy(instance)
if metadataService == nil {
logger.Warnf("Get metadataService of instance %s failed", instance)
return nil
}
// call GetExportedURLs to get the exported urls
urls, err := metadataService.GetExportedURLs(constant.AnyValue, constant.AnyValue, constant.AnyValue, constant.AnyValue)
metadataInfo, err := servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance, metaData[dubboConst.ExportedServicesRevisionPropertyName])
if err != nil {
logger.Errorf("Get exported urls of instance %s failed; due to %s", instance, err.Error())
logger.Errorf("get instance %s metadata info error %v", insPath, err.Error())
return nil
}
instance.SetServiceMetadata(metadataInfo)
urls := make([]*dubboCommon.URL, 0)
for _, service := range metadataInfo.Services {
urls = append(urls, instance.ToURLs(service)...)
}
return urls
}

Expand Down
32 changes: 20 additions & 12 deletions pkg/adapter/dubboregistry/registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import (
)

import (
"github.com/pkg/errors"
)
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"

hessian "github.com/apache/dubbo-go-hessian2"

import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
baseRegistry "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry/base"
zk "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/model"

"github.com/pkg/errors"
)

var (
Expand All @@ -49,6 +51,9 @@ const (

func init() {
registry.SetRegistry(constant.Zookeeper, newZKRegistry)
hessian.RegisterPOJO(&dubboCommon.MetadataInfo{})
hessian.RegisterPOJO(&dubboCommon.ServiceInfo{})
hessian.RegisterPOJO(&dubboCommon.URL{})
}

type ZKRegistry struct {
Expand All @@ -61,7 +66,7 @@ var _ registry.Registry = new(ZKRegistry)

func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEventListener) (registry.Registry, error) {
var zkReg = &ZKRegistry{}
baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener)
baseReg := baseRegistry.NewBaseRegistry(zkReg, adapterListener, registry.RegisterTypeFromName(regConfig.RegistryType))
timeout, err := time.ParseDuration(regConfig.Timeout)
if err != nil {
return nil, errors.Errorf("Incorrect timeout configuration: %s", regConfig.Timeout)
Expand All @@ -73,15 +78,18 @@ func newZKRegistry(regConfig model.Registry, adapterListener common.RegistryEven
client.RegisterHandler(eventChan)
zkReg.BaseRegistry = baseReg
zkReg.client = client
initZKListeners(zkReg)
zkReg.zkListeners = make(map[registry.RegisteredType]registry.Listener)
switch zkReg.RegisteredType {
case registry.RegisteredTypeInterface:
zkReg.zkListeners[zkReg.RegisteredType] = newZKIntfListener(zkReg.client, zkReg, zkReg.AdapterListener)
case registry.RegisteredTypeApplication:
zkReg.zkListeners[zkReg.RegisteredType] = newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
default:
return nil, errors.Errorf("Unsupported registry type: %s", regConfig.RegistryType)
}
return zkReg, nil
}

func initZKListeners(reg *ZKRegistry) {
reg.zkListeners = make(map[registry.RegisteredType]registry.Listener)
reg.zkListeners[registry.RegisteredTypeInterface] = newZKIntfListener(reg.client, reg, reg.AdapterListener)
}

func (r *ZKRegistry) GetClient() *zk.ZooKeeperClient {
return r.client
}
Expand All @@ -96,7 +104,7 @@ func (r *ZKRegistry) DoSubscribe() error {

// To subscribe service level service discovery
func (r *ZKRegistry) interfaceSubscribe() error {
intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface]
intfListener, ok := r.zkListeners[r.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand All @@ -106,7 +114,7 @@ func (r *ZKRegistry) interfaceSubscribe() error {

// DoUnsubscribe stops monitoring the target registry.
func (r *ZKRegistry) DoUnsubscribe() error {
intfListener, ok := r.zkListeners[registry.RegisteredTypeInterface]
intfListener, ok := r.zkListeners[r.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration does not initialized")
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"strings"
"sync"
"time"
)
Expand All @@ -33,7 +32,6 @@ import (
common2 "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

Expand Down Expand Up @@ -128,16 +126,15 @@ func (zkl *serviceListener) waitEventAndHandlePeriod(children []string, e <-chan

// whenever it is called, the children node changed and refresh the api configuration.
func (zkl *serviceListener) handleEvent() {
// get all children of provider, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService/providers
children, err := zkl.client.GetChildren(zkl.path)
if err != nil {
// disable the API
bkConf, methods, _, _ := registry.ParseDubboString(zkl.url.String())
// disable the service all methods
bkConf, _, _, _ := registry.ParseDubboString(zkl.url.String())
apiPattern := registry.GetAPIPattern(bkConf)
for i := range methods {
path := strings.Join([]string{apiPattern, methods[i]}, constant.PathSlash)
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: path}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), path)
}
// delete all config of an interface, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern)
}
return
}
Expand Down
Loading

0 comments on commit a899e49

Please sign in to comment.