From 8def158b442aaad21a26e404b7423875683697f5 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:23:17 +0330 Subject: [PATCH 01/11] feat: add zeromq configuration --- config/config.go | 11 +++++++++++ config/example_config.toml | 29 +++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/config/config.go b/config/config.go index 77429b6fa..e6bb64a73 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ import ( "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" "github.com/pactus-project/pactus/www/nanomsg" + "github.com/pactus-project/pactus/www/zmq" "github.com/pelletier/go-toml/v2" ) @@ -47,6 +48,7 @@ type Config struct { HTTP *http.Config `toml:"http"` WalletManager *wallet.Config `toml:"-"` Nanomsg *nanomsg.Config `toml:"nanomsg"` + ZeroMq *zmq.Config `toml:"zeromq"` } type BootstrapInfo struct { @@ -99,6 +101,7 @@ func defaultConfig() *Config { JSONRPC: jsonrpc.DefaultConfig(), HTTP: http.DefaultConfig(), Nanomsg: nanomsg.DefaultConfig(), + ZeroMq: zmq.DefaultConfig(), WalletManager: wallet.DefaultConfig(), } @@ -219,6 +222,11 @@ func DefaultConfigLocalnet() *Config { conf.HTTP.EnablePprof = true conf.Nanomsg.Enable = true conf.Nanomsg.Listen = "tcp://[::]:40799" + conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332" + conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333" + conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334" + conf.ZeroMq.ZmqPubRawTx = "tcp://127.0.0.1:28335" + conf.ZeroMq.ZmqPubHWM = 1000 return conf } @@ -296,6 +304,9 @@ func (conf *Config) BasicCheck() error { if err := conf.GRPC.BasicCheck(); err != nil { return err } + if err := conf.ZeroMq.BasicCheck(); err != nil { + return err + } return conf.HTTP.BasicCheck() } diff --git a/config/example_config.toml b/config/example_config.toml index baebc4970..1c78a9334 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -246,3 +246,32 @@ # `listen` is the address for incoming connections to the nanomsg server. listen = 'tcp://127.0.0.1:40899' + +# ZeroMQ configuration. +[zeromq] + + # `zmqpubblockinfo` specifies the address for publishing block info notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubblockinfo = '' + + # `zmqpubtxinfo` specifies the address for publishing transaction info notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubtxinfo = '' + + # `zmqpubrawblock` specifies the address for publishing raw block notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubrawblock = '' + + # `zmqpubrawtx` specifies the address for publishing raw transaction notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubrawtx = '' + + # `zmqpubhwm` defines the High Watermark (HWM) for ZeroMQ message pipes. + # This parameter determines the maximum number of messages ZeroMQ can buffer before blocking the publishing of further messages. + # The watermark is applied uniformly to all active topics. + # Default is 1000 + zmqpubhwm = 1000 From 82e8bae251b19ec43860c9c6f51c13708b4eda97 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:23:32 +0330 Subject: [PATCH 02/11] feat: add zeromq logger module --- util/logger/config.go | 1 + util/logger/logger.go | 1 + 2 files changed, 2 insertions(+) diff --git a/util/logger/config.go b/util/logger/config.go index 0d5a47c31..e7018d60a 100644 --- a/util/logger/config.go +++ b/util/logger/config.go @@ -29,6 +29,7 @@ func DefaultConfig() *Config { conf.Levels["_grpc"] = "info" conf.Levels["_nonomsg"] = "info" conf.Levels["_jsonrpc"] = "info" + conf.Levels["_zmq"] = "info" conf.Levels["_firewall"] = "warn" return conf diff --git a/util/logger/logger.go b/util/logger/logger.go index 48cc1e07d..4f2c19fc9 100644 --- a/util/logger/logger.go +++ b/util/logger/logger.go @@ -54,6 +54,7 @@ func getLoggersInst() *logger { conf.Levels["_pool"] = "debug" conf.Levels["_http"] = "debug" conf.Levels["_grpc"] = "debug" + conf.Levels["_zmq"] = "debug" conf.Levels["_firewall"] = "debug" globalInst = &logger{ config: conf, From b4a1d20fa924ca4e7b4116abcbd61105cbb72095 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:23:54 +0330 Subject: [PATCH 03/11] feat: add find port to test suite --- util/testsuite/testsuite.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/util/testsuite/testsuite.go b/util/testsuite/testsuite.go index 3042b12a1..be0d7aa06 100644 --- a/util/testsuite/testsuite.go +++ b/util/testsuite/testsuite.go @@ -3,6 +3,7 @@ package testsuite import ( "encoding/hex" "math/rand" + "net" "testing" "time" @@ -866,3 +867,12 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) { trx.SetSignature(sig) trx.SetPublicKey(prv.PublicKey()) } + +func FindFreePort() int { + listener, _ := net.Listen("tcp", "localhost:0") + defer func() { + _ = listener.Close() + }() + + return listener.Addr().(*net.TCPAddr).Port +} From 4c9819c0264dc4416b396ec31e38dd628f689759 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:24:22 +0330 Subject: [PATCH 04/11] feat: defined zeromq server --- go.mod | 6 +- go.sum | 8 +++ www/zmq/block_info_publisher.go | 26 +++++++ www/zmq/config.go | 92 ++++++++++++++++++++++++ www/zmq/config_test.go | 82 ++++++++++++++++++++++ www/zmq/publisher.go | 28 ++++++++ www/zmq/raw_block_publisher.go | 26 +++++++ www/zmq/raw_tx_publisher.go | 26 +++++++ www/zmq/server.go | 120 ++++++++++++++++++++++++++++++++ www/zmq/server_test.go | 71 +++++++++++++++++++ www/zmq/topic.go | 29 ++++++++ www/zmq/tx_info_publisher.go | 26 +++++++ 12 files changed, 538 insertions(+), 2 deletions(-) create mode 100644 www/zmq/block_info_publisher.go create mode 100644 www/zmq/config.go create mode 100644 www/zmq/config_test.go create mode 100644 www/zmq/publisher.go create mode 100644 www/zmq/raw_block_publisher.go create mode 100644 www/zmq/raw_tx_publisher.go create mode 100644 www/zmq/server.go create mode 100644 www/zmq/server_test.go create mode 100644 www/zmq/topic.go create mode 100644 www/zmq/tx_info_publisher.go diff --git a/go.mod b/go.mod index b00ca0517..5e400dec7 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,8 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect + github.com/go-zeromq/zmq4 v0.17.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -173,9 +175,9 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.31.0 // indirect - golang.org/x/sync v0.9.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.27.0 // indirect - golang.org/x/text v0.20.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.27.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 // indirect diff --git a/go.sum b/go.sum index 99f53778f..5a0a2a70e 100644 --- a/go.sum +++ b/go.sum @@ -122,6 +122,10 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= +github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U= +github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE= +github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc= +github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= @@ -699,6 +703,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -769,6 +775,8 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= diff --git a/www/zmq/block_info_publisher.go b/www/zmq/block_info_publisher.go new file mode 100644 index 000000000..3b39b8248 --- /dev/null +++ b/www/zmq/block_info_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type blockInfoPub struct { + basePub +} + +func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &blockInfoPub{ + basePub: basePub{ + topic: BlockInfo, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*blockInfoPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/config.go b/www/zmq/config.go new file mode 100644 index 000000000..2fd96a0e4 --- /dev/null +++ b/www/zmq/config.go @@ -0,0 +1,92 @@ +package zmq + +import ( + "errors" + "fmt" + "net/url" + "strings" + "time" +) + +type Config struct { + ZmqPubBlockInfo string `toml:"zmqpubblockinfo"` + ZmqPubTxInfo string `toml:"zmqpubtxinfo"` + ZmqPubRawBlock string `toml:"zmqpubrawblock"` + ZmqPubRawTx string `toml:"zmqpubrawtx"` + ZmqPubHWM int `toml:"zmqpubhwm"` + + // Private config + ZmqAutomaticReconnect bool `toml:"-"` + ZmqDialerRetryTime time.Duration `toml:"-"` + ZmqDialerMaxRetries int `toml:"-"` +} + +func DefaultConfig() *Config { + return &Config{ + ZmqAutomaticReconnect: true, + ZmqDialerMaxRetries: 10, + ZmqDialerRetryTime: 250 * time.Millisecond, + ZmqPubHWM: 1000, + } +} + +func (c *Config) BasicCheck() error { + if c.ZmqPubBlockInfo != "" { + if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil { + return err + } + } + + if c.ZmqPubTxInfo != "" { + if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil { + return err + } + } + + if c.ZmqPubRawBlock != "" { + if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil { + return err + } + } + + if c.ZmqPubRawTx != "" { + if err := validateTopicSocket(c.ZmqPubRawTx); err != nil { + return err + } + } + + if c.ZmqPubHWM < 0 { + return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM) + } + + return nil +} + +func validateTopicSocket(socket string) error { + addr, err := url.Parse(socket) + if err != nil { + return errors.New("failed to parse ZmqPub value: " + err.Error()) + } + + if addr.Scheme != "tcp" { + return errors.New("invalid scheme: zeromq socket schema") + } + + if addr.Host == "" { + return errors.New("invalid host: host is empty") + } + + parts := strings.Split(addr.Host, ":") + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return errors.New("invalid host: missing or malformed host/port") + } + + port := parts[1] + for _, r := range port { + if r < '0' || r > '9' { + return errors.New("invalid port: non-numeric characters detected") + } + } + + return nil +} diff --git a/www/zmq/config_test.go b/www/zmq/config_test.go new file mode 100644 index 000000000..26c2770be --- /dev/null +++ b/www/zmq/config_test.go @@ -0,0 +1,82 @@ +package zmq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfig() + + assert.NotNil(t, cfg, "DefaultConfig should not return nil") + assert.Equal(t, "", cfg.ZmqPubBlockInfo, "ZmqPubBlockInfo should be empty") + assert.Equal(t, "", cfg.ZmqPubTxInfo, "ZmqPubTxInfo should be empty") + assert.Equal(t, "", cfg.ZmqPubRawBlock, "ZmqPubRawBlock should be empty") + assert.Equal(t, "", cfg.ZmqPubRawTx, "ZmqPubRawTx should be empty") + assert.Equal(t, 1000, cfg.ZmqPubHWM, "ZmqPubHWM should default to 1000") +} + +func TestBasicCheck(t *testing.T) { + testCases := []struct { + name string + config *Config + expectErr bool + }{ + { + name: "Valid configuration", + config: &Config{ + ZmqPubBlockInfo: "tcp://127.0.0.1:28332", + ZmqPubTxInfo: "tcp://127.0.0.1:28333", + ZmqPubRawBlock: "tcp://127.0.0.1:28334", + ZmqPubRawTx: "tcp://127.0.0.1:28335", + ZmqPubHWM: 1000, + }, + expectErr: false, + }, + { + name: "Invalid scheme", + config: &Config{ + ZmqPubBlockInfo: "udp://127.0.0.1:28332", + }, + expectErr: true, + }, + { + name: "Missing port", + config: &Config{ + ZmqPubBlockInfo: "tcp://127.0.0.1", + }, + expectErr: true, + }, + { + name: "Empty host", + config: &Config{ + ZmqPubBlockInfo: "tcp://:28332", + }, + expectErr: true, + }, + { + name: "Negative ZmqPubHWM", + config: &Config{ + ZmqPubHWM: -1, + }, + expectErr: true, + }, + { + name: "Empty configuration", + config: DefaultConfig(), + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.config.BasicCheck() + if tc.expectErr { + assert.Error(t, err, "BasicCheck should return an error") + } else { + assert.NoError(t, err, "BasicCheck should not return an error") + } + }) + } +} diff --git a/www/zmq/publisher.go b/www/zmq/publisher.go new file mode 100644 index 000000000..8999c86b4 --- /dev/null +++ b/www/zmq/publisher.go @@ -0,0 +1,28 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type Publisher interface { + Address() string + TopicName() string + + onNewBlock(blk *block.Block) +} + +type basePub struct { + topic Topic + zmqSocket zmq4.Socket + logger *logger.SubLogger +} + +func (b *basePub) Address() string { + return b.zmqSocket.Addr().String() +} + +func (b *basePub) TopicName() string { + return b.topic.String() +} diff --git a/www/zmq/raw_block_publisher.go b/www/zmq/raw_block_publisher.go new file mode 100644 index 000000000..6112afa30 --- /dev/null +++ b/www/zmq/raw_block_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type rawBlockPub struct { + basePub +} + +func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &rawBlockPub{ + basePub: basePub{ + topic: RawBlock, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*rawBlockPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/raw_tx_publisher.go b/www/zmq/raw_tx_publisher.go new file mode 100644 index 000000000..b6cfa53c7 --- /dev/null +++ b/www/zmq/raw_tx_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type rawTxPub struct { + basePub +} + +func newRawTxPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &rawTxPub{ + basePub: basePub{ + topic: RawTransaction, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*rawTxPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/server.go b/www/zmq/server.go new file mode 100644 index 000000000..078931e62 --- /dev/null +++ b/www/zmq/server.go @@ -0,0 +1,120 @@ +package zmq + +import ( + "context" + + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type Server struct { + sockets map[string]zmq4.Socket + publishers []Publisher + config *Config + eventCh <-chan any + logger *logger.SubLogger +} + +func New(ctx context.Context, conf *Config, eventCh <-chan any) (*Server, error) { + svr := &Server{ + eventCh: eventCh, + logger: logger.NewSubLogger("_zmq", nil), + publishers: make([]Publisher, 0), + sockets: make(map[string]zmq4.Socket), + config: conf, + } + + publisherOpts := []zmq4.Option{ + zmq4.WithAutomaticReconnect(conf.ZmqAutomaticReconnect), + zmq4.WithDialerRetry(conf.ZmqDialerRetryTime), + zmq4.WithDialerMaxRetries(conf.ZmqDialerMaxRetries), + } + + makePublisher := func(addr string, newPublisher func(socket zmq4.Socket, logger *logger.SubLogger) Publisher) error { + if addr == "" { + return nil + } + + socket, ok := svr.sockets[addr] + if !ok { + socket = zmq4.NewPub(ctx, publisherOpts...) + + if err := socket.SetOption(zmq4.OptionHWM, conf.ZmqPubHWM); err != nil { + return err + } + + if err := socket.Listen(addr); err != nil { + return err + } + } + + pub := newPublisher(socket, svr.logger) + + svr.publishers = append(svr.publishers, pub) + + svr.sockets[addr] = socket + + svr.logger.Info("publisher initialized", "topic", pub.TopicName(), "socket", addr) + + return nil + } + + if err := makePublisher(conf.ZmqPubBlockInfo, newBlockInfoPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubTxInfo, newTxInfoPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubRawBlock, newRawBlockPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubRawTx, newRawTxPub); err != nil { + return nil, err + } + + go svr.receivedEventLoop(ctx) + + return svr, nil +} + +func (s *Server) Publishers() []Publisher { + return s.publishers +} + +func (s *Server) HWM() int { + return s.config.ZmqPubHWM +} + +func (s *Server) Close() { + for _, sock := range s.sockets { + if err := sock.Close(); err != nil { + s.logger.Error("failed to close socket", "err", err) + } + } +} + +func (s *Server) receivedEventLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-s.eventCh: + if !ok { + return + } + + switch ev := event.(type) { + case *block.Block: + for _, pub := range s.publishers { + pub.onNewBlock(ev) + } + default: + panic("invalid event type") + } + } + } +} diff --git a/www/zmq/server_test.go b/www/zmq/server_test.go new file mode 100644 index 000000000..5a43a175f --- /dev/null +++ b/www/zmq/server_test.go @@ -0,0 +1,71 @@ +package zmq + +import ( + "context" + "fmt" + "testing" + + "github.com/pactus-project/pactus/state" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/require" +) + +type testData struct { + *testsuite.TestSuite + + mockState *state.MockState + server *Server + eventCh chan any +} + +func setup(ctx context.Context, t *testing.T, conf *Config) *testData { + t.Helper() + + ts := testsuite.NewTestSuite(t) + mockState := state.MockingState(ts) + eventCh := make(chan any) + sv, err := New(ctx, conf, eventCh) + require.NoError(t, err) + + return &testData{ + TestSuite: ts, + server: sv, + mockState: mockState, + eventCh: eventCh, + } +} + +func (t *testData) cleanup() func() { + return func() { + t.server.Close() + } +} + +func TestTopicsWithSameSocket(t *testing.T) { + port := testsuite.FindFreePort() + addr := fmt.Sprintf("tcp://127.0.0.1:%d", port) + + conf := DefaultConfig() + conf.ZmqPubBlockInfo = addr + conf.ZmqPubTxInfo = addr + conf.ZmqPubRawBlock = addr + conf.ZmqPubRawTx = addr + + ts := setup(context.TODO(), t, conf) + t.Cleanup(ts.cleanup()) + + require.Len(t, ts.server.publishers, 4) +} + +func TestTopicsWithDifferentSockets(t *testing.T) { + conf := DefaultConfig() + conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) + conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) + conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) + conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) + + ts := setup(context.TODO(), t, conf) + t.Cleanup(ts.cleanup()) + + require.Len(t, ts.server.publishers, 4) +} diff --git a/www/zmq/topic.go b/www/zmq/topic.go new file mode 100644 index 000000000..fb2411e74 --- /dev/null +++ b/www/zmq/topic.go @@ -0,0 +1,29 @@ +package zmq + +type Topic int16 + +const ( + BlockInfo Topic = 0x0001 + TransactionInfo Topic = 0x0002 + RawBlock Topic = 0x0003 + RawTransaction Topic = 0x0004 +) + +func (t Topic) String() string { + switch t { + case BlockInfo: + return "block_info" + + case TransactionInfo: + return "transaction_info" + + case RawBlock: + return "raw_block" + + case RawTransaction: + return "raw_transaction" + + default: + return "" + } +} diff --git a/www/zmq/tx_info_publisher.go b/www/zmq/tx_info_publisher.go new file mode 100644 index 000000000..e9e0ab0e9 --- /dev/null +++ b/www/zmq/tx_info_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type txInfoPub struct { + basePub +} + +func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &txInfoPub{ + basePub: basePub{ + topic: TransactionInfo, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*txInfoPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} From 379bc5228c64e85c0b2ba3607c7b8f4eabe5f4d8 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:25:53 +0330 Subject: [PATCH 05/11] chore: add zeromq to semantic workflow --- .github/workflows/semantic-pr.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/semantic-pr.yml b/.github/workflows/semantic-pr.yml index 4ec1a05eb..6c65462ff 100644 --- a/.github/workflows/semantic-pr.yml +++ b/.github/workflows/semantic-pr.yml @@ -60,6 +60,7 @@ jobs: http jsonrpc nanomsg + zeromq windows linux macos From 67cb0c68c01778d380f0f4b2a6e5bf5a150112ad Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 28 Dec 2024 15:37:11 +0330 Subject: [PATCH 06/11] fix: example configration test --- config/example_config.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/example_config.toml b/config/example_config.toml index 1c78a9334..f01b25dc6 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -177,6 +177,7 @@ _pool = 'error' _state = 'info' _sync = 'error' + _zmq = 'info' default = 'info' # `grpc` contains configuration of the gRPC module. From 3a546b968cd2a2250f16c4f0a5f67776ebade112 Mon Sep 17 00:00:00 2001 From: Javad Date: Sun, 29 Dec 2024 08:45:02 +0330 Subject: [PATCH 07/11] fix: conflicts --- go.mod | 2 ++ go.sum | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/go.mod b/go.mod index 895aa0751..0ef046a6a 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,8 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect + github.com/go-zeromq/zmq4 v0.17.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 240c352a9..b6ed4d140 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,10 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= +github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U= +github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE= +github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc= +github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= From 76dcc5b6d8d4affe32232d3f2533dba8fe8f2bbc Mon Sep 17 00:00:00 2001 From: Javad Date: Sun, 29 Dec 2024 09:12:05 +0330 Subject: [PATCH 08/11] fix: improve tests --- util/testsuite/testsuite.go | 2 +- www/zmq/server.go | 2 + www/zmq/server_test.go | 73 ++++++++++++++++++++++++++++--------- 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/util/testsuite/testsuite.go b/util/testsuite/testsuite.go index be0d7aa06..a25acbe75 100644 --- a/util/testsuite/testsuite.go +++ b/util/testsuite/testsuite.go @@ -868,7 +868,7 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) { trx.SetPublicKey(prv.PublicKey()) } -func FindFreePort() int { +func (*TestSuite) FindFreePort() int { listener, _ := net.Listen("tcp", "localhost:0") defer func() { _ = listener.Close() diff --git a/www/zmq/server.go b/www/zmq/server.go index 078931e62..6c94b4ae4 100644 --- a/www/zmq/server.go +++ b/www/zmq/server.go @@ -104,6 +104,8 @@ func (s *Server) receivedEventLoop(ctx context.Context) { return case event, ok := <-s.eventCh: if !ok { + s.logger.Warn("event channel closed") + return } diff --git a/www/zmq/server_test.go b/www/zmq/server_test.go index 5a43a175f..932089841 100644 --- a/www/zmq/server_test.go +++ b/www/zmq/server_test.go @@ -3,6 +3,7 @@ package zmq import ( "context" "fmt" + "github.com/stretchr/testify/assert" "testing" "github.com/pactus-project/pactus/state" @@ -18,31 +19,60 @@ type testData struct { eventCh chan any } -func setup(ctx context.Context, t *testing.T, conf *Config) *testData { +func setup(t *testing.T) *testData { t.Helper() ts := testsuite.NewTestSuite(t) mockState := state.MockingState(ts) - eventCh := make(chan any) - sv, err := New(ctx, conf, eventCh) - require.NoError(t, err) return &testData{ TestSuite: ts, - server: sv, mockState: mockState, - eventCh: eventCh, } } -func (t *testData) cleanup() func() { +func (ts *testData) initServer(ctx context.Context, conf *Config) error { + eventCh := make(chan any) + sv, err := New(ctx, conf, eventCh) + if err != nil { + return err + } + + ts.server = sv + ts.eventCh = eventCh + + return nil +} + +func (ts *testData) resetServer() { + ts.server = nil + ts.eventCh = nil +} + +func (ts *testData) cleanup() func() { return func() { - t.server.Close() + ts.server.Close() + ts.resetServer() } } +func TestServerWithDefaultConfig(t *testing.T) { + ts := setup(t) + + conf := DefaultConfig() + + err := ts.initServer(context.TODO(), conf) + t.Cleanup(ts.cleanup()) + + assert.NoError(t, err) + require.NotNil(t, ts.server) +} + func TestTopicsWithSameSocket(t *testing.T) { - port := testsuite.FindFreePort() + ts := setup(t) + t.Cleanup(ts.cleanup()) + + port := ts.FindFreePort() addr := fmt.Sprintf("tcp://127.0.0.1:%d", port) conf := DefaultConfig() @@ -51,21 +81,30 @@ func TestTopicsWithSameSocket(t *testing.T) { conf.ZmqPubRawBlock = addr conf.ZmqPubRawTx = addr - ts := setup(context.TODO(), t, conf) - t.Cleanup(ts.cleanup()) + err := ts.initServer(context.TODO(), conf) + require.NoError(t, err) require.Len(t, ts.server.publishers, 4) + + expectedAddr := ts.server.publishers[0].Address() + + for _, pub := range ts.server.publishers { + require.Equal(t, expectedAddr, pub.Address(), "All publishers must have the same address") + } } func TestTopicsWithDifferentSockets(t *testing.T) { + ts := setup(t) + t.Cleanup(ts.cleanup()) + conf := DefaultConfig() - conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) - conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) - conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) - conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", testsuite.FindFreePort()) + conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) + conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) + conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) + conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) - ts := setup(context.TODO(), t, conf) - t.Cleanup(ts.cleanup()) + err := ts.initServer(context.TODO(), conf) + require.NoError(t, err) require.Len(t, ts.server.publishers, 4) } From 8fce96e9d0a9276f222de27cc132a9dd4e57dabb Mon Sep 17 00:00:00 2001 From: Javad Date: Sun, 29 Dec 2024 09:19:28 +0330 Subject: [PATCH 09/11] tests: add test for base publisher --- www/zmq/publisher_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 www/zmq/publisher_test.go diff --git a/www/zmq/publisher_test.go b/www/zmq/publisher_test.go new file mode 100644 index 000000000..4127805f9 --- /dev/null +++ b/www/zmq/publisher_test.go @@ -0,0 +1,28 @@ +package zmq + +import ( + "context" + "fmt" + "github.com/go-zeromq/zmq4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net/url" + "testing" +) + +func TestBasePublisher(t *testing.T) { + ts := setup(t) + + topic := BlockInfo + addr, err := url.Parse(fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort())) + require.NoError(t, err) + + base := &basePub{ + topic: topic, + zmqSocket: zmq4.NewPub(context.TODO()), + } + + require.NoError(t, base.zmqSocket.Listen(addr.String())) + assert.Equal(t, base.Address(), addr.Host) + assert.Equal(t, base.TopicName(), topic.String()) +} From dad885382ec79fa78f47a1181c5ea3b565f7e1f1 Mon Sep 17 00:00:00 2001 From: Javad Date: Sun, 29 Dec 2024 09:21:39 +0330 Subject: [PATCH 10/11] fix: linter errors --- www/zmq/publisher_test.go | 9 +++++---- www/zmq/server_test.go | 40 +++++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/www/zmq/publisher_test.go b/www/zmq/publisher_test.go index 4127805f9..47c1bb549 100644 --- a/www/zmq/publisher_test.go +++ b/www/zmq/publisher_test.go @@ -3,18 +3,19 @@ package zmq import ( "context" "fmt" + "net/url" + "testing" + "github.com/go-zeromq/zmq4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "net/url" - "testing" ) func TestBasePublisher(t *testing.T) { - ts := setup(t) + suite := setup(t) topic := BlockInfo - addr, err := url.Parse(fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort())) + addr, err := url.Parse(fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort())) require.NoError(t, err) base := &basePub{ diff --git a/www/zmq/server_test.go b/www/zmq/server_test.go index 932089841..b8ad269ac 100644 --- a/www/zmq/server_test.go +++ b/www/zmq/server_test.go @@ -3,11 +3,11 @@ package zmq import ( "context" "fmt" - "github.com/stretchr/testify/assert" "testing" "github.com/pactus-project/pactus/state" "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -57,22 +57,22 @@ func (ts *testData) cleanup() func() { } func TestServerWithDefaultConfig(t *testing.T) { - ts := setup(t) + suite := setup(t) conf := DefaultConfig() - err := ts.initServer(context.TODO(), conf) - t.Cleanup(ts.cleanup()) + err := suite.initServer(context.TODO(), conf) + t.Cleanup(suite.cleanup()) assert.NoError(t, err) - require.NotNil(t, ts.server) + require.NotNil(t, suite.server) } func TestTopicsWithSameSocket(t *testing.T) { - ts := setup(t) - t.Cleanup(ts.cleanup()) + suite := setup(t) + t.Cleanup(suite.cleanup()) - port := ts.FindFreePort() + port := suite.FindFreePort() addr := fmt.Sprintf("tcp://127.0.0.1:%d", port) conf := DefaultConfig() @@ -81,30 +81,30 @@ func TestTopicsWithSameSocket(t *testing.T) { conf.ZmqPubRawBlock = addr conf.ZmqPubRawTx = addr - err := ts.initServer(context.TODO(), conf) + err := suite.initServer(context.TODO(), conf) require.NoError(t, err) - require.Len(t, ts.server.publishers, 4) + require.Len(t, suite.server.publishers, 4) - expectedAddr := ts.server.publishers[0].Address() + expectedAddr := suite.server.publishers[0].Address() - for _, pub := range ts.server.publishers { + for _, pub := range suite.server.publishers { require.Equal(t, expectedAddr, pub.Address(), "All publishers must have the same address") } } func TestTopicsWithDifferentSockets(t *testing.T) { - ts := setup(t) - t.Cleanup(ts.cleanup()) + suite := setup(t) + t.Cleanup(suite.cleanup()) conf := DefaultConfig() - conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) - conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) - conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) - conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", ts.FindFreePort()) + conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) - err := ts.initServer(context.TODO(), conf) + err := suite.initServer(context.TODO(), conf) require.NoError(t, err) - require.Len(t, ts.server.publishers, 4) + require.Len(t, suite.server.publishers, 4) } From 2af216ea0420c31e41650682a14825697083b49f Mon Sep 17 00:00:00 2001 From: Javad Date: Wed, 8 Jan 2025 10:11:33 +0330 Subject: [PATCH 11/11] fix: remove empty lines --- go.mod | 2 +- www/zmq/server.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bac2a1ae4..7ef1a78e6 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/consensys/gnark-crypto v0.14.0 github.com/fxamacker/cbor/v2 v2.7.0 + github.com/go-zeromq/zmq4 v0.17.0 github.com/gofrs/flock v0.12.1 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.6.0 @@ -69,7 +70,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect - github.com/go-zeromq/zmq4 v0.17.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/www/zmq/server.go b/www/zmq/server.go index 6c94b4ae4..11e887afe 100644 --- a/www/zmq/server.go +++ b/www/zmq/server.go @@ -50,9 +50,7 @@ func New(ctx context.Context, conf *Config, eventCh <-chan any) (*Server, error) } pub := newPublisher(socket, svr.logger) - svr.publishers = append(svr.publishers, pub) - svr.sockets[addr] = socket svr.logger.Info("publisher initialized", "topic", pub.TopicName(), "socket", addr)