Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(other): add zeromq server with configration #1660

Merged
merged 15 commits into from
Jan 8, 2025
1 change: 1 addition & 0 deletions .github/workflows/semantic-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
http
jsonrpc
nanomsg
zeromq
windows
linux
macos
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/pactus-project/pactus/www/http"
"github.com/pactus-project/pactus/www/jsonrpc"
"github.com/pactus-project/pactus/www/nanomsg"
"github.com/pactus-project/pactus/www/zmq"
"github.com/pelletier/go-toml/v2"
)

Expand Down Expand Up @@ -47,6 +48,7 @@
HTTP *http.Config `toml:"http"`
WalletManager *wallet.Config `toml:"-"`
Nanomsg *nanomsg.Config `toml:"nanomsg"`
ZeroMq *zmq.Config `toml:"zeromq"`
}

type BootstrapInfo struct {
Expand Down Expand Up @@ -99,6 +101,7 @@
JSONRPC: jsonrpc.DefaultConfig(),
HTTP: http.DefaultConfig(),
Nanomsg: nanomsg.DefaultConfig(),
ZeroMq: zmq.DefaultConfig(),
WalletManager: wallet.DefaultConfig(),
}

Expand Down Expand Up @@ -219,6 +222,11 @@
conf.HTTP.EnablePprof = true
conf.Nanomsg.Enable = true
conf.Nanomsg.Listen = "tcp://[::]:40799"
conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332"
conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333"
conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334"
conf.ZeroMq.ZmqPubRawTx = "tcp://127.0.0.1:28335"
conf.ZeroMq.ZmqPubHWM = 1000

return conf
}
Expand Down Expand Up @@ -296,6 +304,9 @@
if err := conf.GRPC.BasicCheck(); err != nil {
return err
}
if err := conf.ZeroMq.BasicCheck(); err != nil {
return err
}

Check warning on line 309 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L308-L309

Added lines #L308 - L309 were not covered by tests

return conf.HTTP.BasicCheck()
}
30 changes: 30 additions & 0 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
_pool = 'error'
_state = 'info'
_sync = 'error'
_zmq = 'info'
default = 'info'

# `grpc` contains configuration of the gRPC module.
Expand Down Expand Up @@ -246,3 +247,32 @@

# `listen` is the address for incoming connections to the nanomsg server.
listen = 'tcp://127.0.0.1:40899'

# ZeroMQ configuration.
[zeromq]

# `zmqpubblockinfo` specifies the address for publishing block info notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubblockinfo = ''

# `zmqpubtxinfo` specifies the address for publishing transaction info notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubtxinfo = ''

# `zmqpubrawblock` specifies the address for publishing raw block notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubrawblock = ''

# `zmqpubrawtx` specifies the address for publishing raw transaction notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubrawtx = ''

# `zmqpubhwm` defines the High Watermark (HWM) for ZeroMQ message pipes.
# This parameter determines the maximum number of messages ZeroMQ can buffer before blocking the publishing of further messages.
# The watermark is applied uniformly to all active topics.
# Default is 1000
zmqpubhwm = 1000
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect
Ja7ad marked this conversation as resolved.
Show resolved Hide resolved
github.com/go-zeromq/zmq4 v0.17.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc=
github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
Expand Down
1 change: 1 addition & 0 deletions util/logger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfig() *Config {
conf.Levels["_grpc"] = "info"
conf.Levels["_nonomsg"] = "info"
conf.Levels["_jsonrpc"] = "info"
conf.Levels["_zmq"] = "info"
conf.Levels["_firewall"] = "warn"

return conf
Expand Down
1 change: 1 addition & 0 deletions util/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func getLoggersInst() *logger {
conf.Levels["_pool"] = "debug"
conf.Levels["_http"] = "debug"
conf.Levels["_grpc"] = "debug"
conf.Levels["_zmq"] = "debug"
conf.Levels["_firewall"] = "debug"
globalInst = &logger{
config: conf,
Expand Down
10 changes: 10 additions & 0 deletions util/testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testsuite
import (
"encoding/hex"
"math/rand"
"net"
"testing"
"time"

Expand Down Expand Up @@ -866,3 +867,12 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {
trx.SetSignature(sig)
trx.SetPublicKey(prv.PublicKey())
}

func (*TestSuite) FindFreePort() int {
listener, _ := net.Listen("tcp", "localhost:0")
defer func() {
_ = listener.Close()
}()

return listener.Addr().(*net.TCPAddr).Port
}
26 changes: 26 additions & 0 deletions www/zmq/block_info_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package zmq

import (
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type blockInfoPub struct {
basePub
}

func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &blockInfoPub{
basePub: basePub{
topic: BlockInfo,
zmqSocket: socket,
logger: logger,
},
}
}

func (*blockInfoPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")

Check warning on line 25 in www/zmq/block_info_publisher.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/block_info_publisher.go#L23-L25

Added lines #L23 - L25 were not covered by tests
}
92 changes: 92 additions & 0 deletions www/zmq/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package zmq

import (
"errors"
"fmt"
"net/url"
"strings"
"time"
)

type Config struct {
ZmqPubBlockInfo string `toml:"zmqpubblockinfo"`
ZmqPubTxInfo string `toml:"zmqpubtxinfo"`
ZmqPubRawBlock string `toml:"zmqpubrawblock"`
ZmqPubRawTx string `toml:"zmqpubrawtx"`
ZmqPubHWM int `toml:"zmqpubhwm"`

// Private config
ZmqAutomaticReconnect bool `toml:"-"`
ZmqDialerRetryTime time.Duration `toml:"-"`
ZmqDialerMaxRetries int `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
ZmqAutomaticReconnect: true,
ZmqDialerMaxRetries: 10,
ZmqDialerRetryTime: 250 * time.Millisecond,
ZmqPubHWM: 1000,
}
}

func (c *Config) BasicCheck() error {
if c.ZmqPubBlockInfo != "" {
if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil {
return err
}
}

if c.ZmqPubTxInfo != "" {
if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil {
return err
}

Check warning on line 43 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}

if c.ZmqPubRawBlock != "" {
if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil {
return err
}

Check warning on line 49 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

if c.ZmqPubRawTx != "" {
if err := validateTopicSocket(c.ZmqPubRawTx); err != nil {
return err
}

Check warning on line 55 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

if c.ZmqPubHWM < 0 {
return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM)
}

return nil
}

func validateTopicSocket(socket string) error {
addr, err := url.Parse(socket)
if err != nil {
return errors.New("failed to parse ZmqPub value: " + err.Error())
}

Check warning on line 69 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L68-L69

Added lines #L68 - L69 were not covered by tests

if addr.Scheme != "tcp" {
return errors.New("invalid scheme: zeromq socket schema")
}

if addr.Host == "" {
return errors.New("invalid host: host is empty")
}

Check warning on line 77 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L76-L77

Added lines #L76 - L77 were not covered by tests

parts := strings.Split(addr.Host, ":")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return errors.New("invalid host: missing or malformed host/port")
}

port := parts[1]
for _, r := range port {
if r < '0' || r > '9' {
return errors.New("invalid port: non-numeric characters detected")
}

Check warning on line 88 in www/zmq/config.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/config.go#L87-L88

Added lines #L87 - L88 were not covered by tests
}

return nil
}
82 changes: 82 additions & 0 deletions www/zmq/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package zmq

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig()

assert.NotNil(t, cfg, "DefaultConfig should not return nil")
assert.Equal(t, "", cfg.ZmqPubBlockInfo, "ZmqPubBlockInfo should be empty")
assert.Equal(t, "", cfg.ZmqPubTxInfo, "ZmqPubTxInfo should be empty")
assert.Equal(t, "", cfg.ZmqPubRawBlock, "ZmqPubRawBlock should be empty")
assert.Equal(t, "", cfg.ZmqPubRawTx, "ZmqPubRawTx should be empty")
assert.Equal(t, 1000, cfg.ZmqPubHWM, "ZmqPubHWM should default to 1000")
}

func TestBasicCheck(t *testing.T) {
testCases := []struct {
name string
config *Config
expectErr bool
}{
{
name: "Valid configuration",
config: &Config{
ZmqPubBlockInfo: "tcp://127.0.0.1:28332",
ZmqPubTxInfo: "tcp://127.0.0.1:28333",
ZmqPubRawBlock: "tcp://127.0.0.1:28334",
ZmqPubRawTx: "tcp://127.0.0.1:28335",
ZmqPubHWM: 1000,
},
expectErr: false,
},
{
name: "Invalid scheme",
config: &Config{
ZmqPubBlockInfo: "udp://127.0.0.1:28332",
},
expectErr: true,
},
{
name: "Missing port",
config: &Config{
ZmqPubBlockInfo: "tcp://127.0.0.1",
},
expectErr: true,
},
{
name: "Empty host",
config: &Config{
ZmqPubBlockInfo: "tcp://:28332",
},
expectErr: true,
},
{
name: "Negative ZmqPubHWM",
config: &Config{
ZmqPubHWM: -1,
},
expectErr: true,
},
{
name: "Empty configuration",
config: DefaultConfig(),
expectErr: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.config.BasicCheck()
if tc.expectErr {
assert.Error(t, err, "BasicCheck should return an error")
} else {
assert.NoError(t, err, "BasicCheck should not return an error")
}
})
}
}
28 changes: 28 additions & 0 deletions www/zmq/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zmq

import (
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type Publisher interface {
Address() string
TopicName() string

onNewBlock(blk *block.Block)
}

type basePub struct {
topic Topic
zmqSocket zmq4.Socket
logger *logger.SubLogger
}

func (b *basePub) Address() string {
return b.zmqSocket.Addr().String()
}

func (b *basePub) TopicName() string {
return b.topic.String()
}
Loading
Loading