Skip to content

Commit

Permalink
feat: added support for configcenter listening and logger hot-reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
mutezebra committed Jan 12, 2025
1 parent ee4a269 commit 010b6be
Show file tree
Hide file tree
Showing 18 changed files with 585 additions and 88 deletions.
7 changes: 5 additions & 2 deletions configcenter/configclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package configcenter

import "github.com/apache/dubbo-go-pixiu/pkg/model"

type (
ConfigClient interface {
LoadConfig(properties map[string]interface{}) (string, error)

ListenConfig(properties map[string]interface{}) (err error)
}

ListenConfig func(data string)
// ViewConfig returns the current remote configuration.
ViewConfig() *model.Bootstrap
}
)
28 changes: 21 additions & 7 deletions configcenter/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

import (
"github.com/ghodss/yaml"
"gopkg.in/yaml.v3"
)

import (
Expand All @@ -42,6 +42,7 @@ var Parsers = map[string]func(data []byte, v interface{}) error{
type (
Load interface {
LoadConfigs(boot *model.Bootstrap, opts ...Option) (v *model.Bootstrap, err error)
ViewRemoteConfig() *model.Bootstrap
}

Option func(opt *Options)
Expand All @@ -61,12 +62,14 @@ type DefaultConfigLoad struct {
}

func NewConfigLoad(bootConfig *model.Bootstrap) *DefaultConfigLoad {

var configClient ConfigClient

var err error
// config center load
if strings.EqualFold(bootConfig.Config.Type, KEY_CONFIG_TYPE_NACOS) {
configClient, _ = NewNacosConfig(bootConfig)
configClient, err = NewNacosConfig(bootConfig)
if err != nil {
logger.Errorf("Get new nacos config failed,err: %v", err)
}
}

if configClient == nil {
Expand All @@ -81,7 +84,6 @@ func NewConfigLoad(bootConfig *model.Bootstrap) *DefaultConfigLoad {
}

func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (v *model.Bootstrap, err error) {

var opt Options
for _, o := range opts {
o(&opt)
Expand All @@ -104,7 +106,6 @@ func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (
}

data, err := d.configClient.LoadConfig(m)

if err != nil {
return nil, err
}
Expand All @@ -114,11 +115,24 @@ func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (
return boot, err
}

err = Parsers[".yml"]([]byte(data), boot)
if err = Parsers[".yml"]([]byte(data), boot); err != nil {
logger.Errorf("failed to parse the configuration loaded from the remote,err: %v", err)
return boot, err
}

if err = d.configClient.ListenConfig(m); err != nil {
logger.Errorf("failed to listen the remote configcenter config,err: %v", err)
return boot, err
}

return boot, err
}

// ViewRemoteConfig returns the current remote configuration.
func (d *DefaultConfigLoad) ViewRemoteConfig() *model.Bootstrap {
return d.configClient.ViewConfig()
}

func ParseYamlBytes(content []byte, v interface{}) error {
return yaml.Unmarshal(content, v)
}
7 changes: 3 additions & 4 deletions configcenter/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
)

func getBootstrap() *model.Bootstrap {

return &model.Bootstrap{
Config: &model.ConfigCenter{
Type: "nacos",
Expand Down Expand Up @@ -69,7 +68,7 @@ func TestDefaultConfigLoad_LoadConfigs(t *testing.T) {
boot *model.Bootstrap
opts []Option
}
var tests = []struct {
tests := []struct {
name string
fields fields
args args
Expand Down Expand Up @@ -139,8 +138,8 @@ func TestDefaultConfigLoad_LoadConfigs(t *testing.T) {
t.Errorf("LoadConfigs() error = %v, wantErr %v", err, tt.wantErr)
return
}
//assert.True(t, gotV.Nacos.DataId == DataId, "load config by nacos config center error!")
//assert.True(t, len(gotV.StaticResources.Listeners) > 0, "load config by nacos config center error!")
// assert.True(t, gotV.Nacos.DataId == DataId, "load config by nacos config center error!")
// assert.True(t, len(gotV.StaticResources.Listeners) > 0, "load config by nacos config center error!")
conf, _ := json.Marshal(gotV)
logger.Infof("config of Bootstrap load by nacos : %v", string(conf))
})
Expand Down
121 changes: 70 additions & 51 deletions configcenter/nacos_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package configcenter

import (
"sync"

"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

// Constants for configuration keys.
const (
KeyDataId = "dataId"
KeyGroup = "group"
Expand All @@ -39,6 +40,7 @@ const (
KeyTenant = "tenant"
)

// Constants for Nacos configuration.
const (
DataId = "pixiu.yaml"
Group = "DEFAULT_GROUP"
Expand All @@ -50,29 +52,41 @@ const (
Scheme = "http"
)

type (
NacosConfig struct {
client config_client.IConfigClient
// NacosConfig represents the Nacos configuration client and its state.
type NacosConfig struct {
client config_client.IConfigClient
remoteConfig *model.Bootstrap
mu sync.Mutex
}

// todo not support now
listenConfigCallback ListenConfig
// NewNacosConfig creates a new NacosConfig instance.
// It returns an error if no Nacos server is configured or if the client cannot be created.
func NewNacosConfig(boot *model.Bootstrap) (ConfigClient, error) {
if len(boot.Nacos.ServerConfigs) == 0 {
return nil, errors.New("no Nacos server configured")
}
)

func NewNacosConfig(boot *model.Bootstrap) (configClient ConfigClient, err error) {

var sc []constant.ServerConfig
if len(boot.Nacos.ServerConfigs) == 0 {
return nil, errors.New("no nacos server be setted")
nacosClient, err := getNacosConfigClient(boot)
if err != nil {
return nil, err
}
for _, serveConfig := range boot.Nacos.ServerConfigs {
sc = append(sc, constant.ServerConfig{
Port: serveConfig.Port,
IpAddr: serveConfig.IpAddr,

return &NacosConfig{
client: nacosClient,
}, nil
}

// getNacosConfigClient initializes and returns a Nacos config client.
func getNacosConfigClient(boot *model.Bootstrap) (config_client.IConfigClient, error) {
var serverConfigs []constant.ServerConfig
for _, serverConfig := range boot.Nacos.ServerConfigs {
serverConfigs = append(serverConfigs, constant.ServerConfig{
Port: serverConfig.Port,
IpAddr: serverConfig.IpAddr,
})
}

cc := constant.ClientConfig{
clientConfig := constant.ClientConfig{
NamespaceId: boot.Nacos.ClientConfig.NamespaceId,
TimeoutMs: boot.Nacos.ClientConfig.TimeoutMs,
NotLoadCacheAtStart: boot.Nacos.ClientConfig.NotLoadCacheAtStart,
Expand All @@ -81,56 +95,61 @@ func NewNacosConfig(boot *model.Bootstrap) (configClient ConfigClient, err error
LogLevel: boot.Nacos.ClientConfig.LogLevel,
}

pa := vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
}
nacos, err := clients.NewConfigClient(pa)
if err != nil {
return nil, err
}
configClient = &NacosConfig{
client: nacos,
clientParam := vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
}

return configClient, nil
return clients.NewConfigClient(clientParam)
}

// LoadConfig retrieves the configuration from Nacos based on the provided parameters.
func (n *NacosConfig) LoadConfig(param map[string]interface{}) (string, error) {
return n.client.GetConfig(vo.ConfigParam{
DataId: getOrDefault(param[KeyDataId].(string), DataId),
Group: getOrDefault(param[KeyGroup].(string), Group),
})
}

func getOrDefault(target string, quiet string) string {
// getOrDefault returns the target value if it is not empty; otherwise, it returns the fallback value.
func getOrDefault(target, fallback string) string {
if len(target) == 0 {
target = quiet
return fallback
}
return target
}

func (n *NacosConfig) ListenConfig(param map[string]interface{}) (err error) {
// todo noop, not support
if true {
return nil
}
listen := n.listen(getOrDefault(param[KeyDataId].(string), DataId), getOrDefault(param[KeyGroup].(string), Group))
return listen()
// ListenConfig listens for configuration changes in Nacos.
func (n *NacosConfig) ListenConfig(param map[string]interface{}) error {
return n.client.ListenConfig(vo.ConfigParam{
DataId: getOrDefault(param[KeyDataId].(string), DataId),
Group: getOrDefault(param[KeyGroup].(string), Group),
OnChange: n.onChange,
})
}

func (n *NacosConfig) listen(dataId, group string) func() error {
return func() error {
return n.client.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
OnChange: func(namespace, group, dataId, data string) {
if len(data) == 0 {
logger.Errorf("nacos listen callback data nil error , namespace : %s,group : %s , dataId : %s , data : %s")
return
}
n.listenConfigCallback(data)
},
})
// onChange is the callback function triggered when the configuration changes in Nacos.
func (n *NacosConfig) onChange(namespace, group, dataId, data string) {
n.mu.Lock()
defer n.mu.Unlock()

if len(data) == 0 {
logger.Errorf("Nacos listen callback data is nil. Namespace: %s, Group: %s, DataId: %s", namespace, group, dataId)
return
}

boot := new(model.Bootstrap)
if err := Parsers[".yml"]([]byte(data), boot); err != nil {
logger.Errorf("Failed to parse the configuration loaded from the remote. Error: %v", err)
return
}

n.remoteConfig = boot
}

// ViewConfig returns the current remote configuration.
func (n *NacosConfig) ViewConfig() *model.Bootstrap {
n.mu.Lock()
defer n.mu.Unlock()
return n.remoteConfig
}
88 changes: 88 additions & 0 deletions configcenter/nacos_load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package configcenter

import (
"fmt"
"io"
"os"
"path"
"strings"
"testing"

"github.com/apache/dubbo-go-pixiu/pkg/logger"
. "github.com/smartystreets/goconvey/convey"
)

// isNacosRunning checks whether the Nacos server is running.
// It returns true if Nacos is running, otherwise false.
func isNacosRunning(t *testing.T) bool {
t.Helper()
_, err := getNacosConfigClient(getBootstrap())
return err == nil
}

// TestNewNacosConfig tests the creation of a new Nacos configuration.
// If Nacos is not running, the test is skipped.
func TestNewNacosConfig(t *testing.T) {
if !isNacosRunning(t) {
t.Skip("Nacos is not running, skipping the test.")
return
}

Convey("Test NewNacosConfig", t, func() {
cfg := getBootstrap()

// Test successful creation of NacosConfig.
_, err := NewNacosConfig(cfg)
So(err, ShouldBeNil)

// Test creation failure when Nacos server configurations are missing.
cfg.Nacos.ServerConfigs = nil
_, err = NewNacosConfig(cfg)
So(err, ShouldNotBeNil)
})
}

// TestNacosConfig_onChange tests the onChange method of NacosConfig.
func TestNacosConfig_onChange(t *testing.T) {
Convey("TestNacosConfig_onChange", t, func() {
cfg := getBootstrap()
c, err := NewNacosConfig(cfg)
So(err, ShouldBeNil)

client, ok := c.(*NacosConfig)
So(ok, ShouldBeTrue)

// Verify the current working directory.
wd, err := os.Getwd()
So(err, ShouldBeNil)

paths := strings.Split(wd, "/")
So(paths[len(paths)-1], ShouldEqual, "configcenter")

// Open the configuration file for testing.
file, err := os.Open(fmt.Sprintf("/%s/configs/conf.yaml", path.Join(paths[:len(paths)-1]...)))
So(err, ShouldBeNil)
defer func() { So(file.Close(), ShouldBeNil) }()

conf, err := io.ReadAll(file)
So(err, ShouldBeNil)

Convey("Test onChange with valid input", func() {
So(client.remoteConfig, ShouldBeNil)
client.onChange(Namespace, Group, DataId, string(conf))
So(client.remoteConfig, ShouldNotBeNil)
})

Convey("Test onChange with empty input", func() {
// Suppress logs during this test.
logger.SetLoggerLevel("fatal")

client.remoteConfig = nil
client.onChange(Namespace, Group, DataId, "")
So(client.remoteConfig, ShouldBeNil)

// Restore the logger level.
logger.SetLoggerLevel("info")
})
})
}
Loading

0 comments on commit 010b6be

Please sign in to comment.