diff --git a/.github/workflows/semantic-pr.yml b/.github/workflows/semantic-pr.yml index 4ec1a05eb..6c65462ff 100644 --- a/.github/workflows/semantic-pr.yml +++ b/.github/workflows/semantic-pr.yml @@ -60,6 +60,7 @@ jobs: http jsonrpc nanomsg + zeromq windows linux macos diff --git a/config/config.go b/config/config.go index 77429b6fa..e6bb64a73 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ import ( "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" "github.com/pactus-project/pactus/www/nanomsg" + "github.com/pactus-project/pactus/www/zmq" "github.com/pelletier/go-toml/v2" ) @@ -47,6 +48,7 @@ type Config struct { HTTP *http.Config `toml:"http"` WalletManager *wallet.Config `toml:"-"` Nanomsg *nanomsg.Config `toml:"nanomsg"` + ZeroMq *zmq.Config `toml:"zeromq"` } type BootstrapInfo struct { @@ -99,6 +101,7 @@ func defaultConfig() *Config { JSONRPC: jsonrpc.DefaultConfig(), HTTP: http.DefaultConfig(), Nanomsg: nanomsg.DefaultConfig(), + ZeroMq: zmq.DefaultConfig(), WalletManager: wallet.DefaultConfig(), } @@ -219,6 +222,11 @@ func DefaultConfigLocalnet() *Config { conf.HTTP.EnablePprof = true conf.Nanomsg.Enable = true conf.Nanomsg.Listen = "tcp://[::]:40799" + conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332" + conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333" + conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334" + conf.ZeroMq.ZmqPubRawTx = "tcp://127.0.0.1:28335" + conf.ZeroMq.ZmqPubHWM = 1000 return conf } @@ -296,6 +304,9 @@ func (conf *Config) BasicCheck() error { if err := conf.GRPC.BasicCheck(); err != nil { return err } + if err := conf.ZeroMq.BasicCheck(); err != nil { + return err + } return conf.HTTP.BasicCheck() } diff --git a/config/example_config.toml b/config/example_config.toml index baebc4970..f01b25dc6 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -177,6 +177,7 @@ _pool = 'error' _state = 'info' _sync = 'error' + _zmq = 'info' default = 'info' # `grpc` contains configuration of the gRPC module. @@ -246,3 +247,32 @@ # `listen` is the address for incoming connections to the nanomsg server. listen = 'tcp://127.0.0.1:40899' + +# ZeroMQ configuration. +[zeromq] + + # `zmqpubblockinfo` specifies the address for publishing block info notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubblockinfo = '' + + # `zmqpubtxinfo` specifies the address for publishing transaction info notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubtxinfo = '' + + # `zmqpubrawblock` specifies the address for publishing raw block notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubrawblock = '' + + # `zmqpubrawtx` specifies the address for publishing raw transaction notifications. + # Example: 'tcp://127.0.0.1:28332' + # Default is '', meaning the topic is disabled + zmqpubrawtx = '' + + # `zmqpubhwm` defines the High Watermark (HWM) for ZeroMQ message pipes. + # This parameter determines the maximum number of messages ZeroMQ can buffer before blocking the publishing of further messages. + # The watermark is applied uniformly to all active topics. + # Default is 1000 + zmqpubhwm = 1000 diff --git a/go.mod b/go.mod index 89eab06c0..7ef1a78e6 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/consensys/gnark-crypto v0.14.0 github.com/fxamacker/cbor/v2 v2.7.0 + github.com/go-zeromq/zmq4 v0.17.0 github.com/gofrs/flock v0.12.1 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.6.0 @@ -68,6 +69,7 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index e857bc505..a63aa3964 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,10 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= +github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U= +github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE= +github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc= +github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= diff --git a/util/logger/config.go b/util/logger/config.go index 0d5a47c31..e7018d60a 100644 --- a/util/logger/config.go +++ b/util/logger/config.go @@ -29,6 +29,7 @@ func DefaultConfig() *Config { conf.Levels["_grpc"] = "info" conf.Levels["_nonomsg"] = "info" conf.Levels["_jsonrpc"] = "info" + conf.Levels["_zmq"] = "info" conf.Levels["_firewall"] = "warn" return conf diff --git a/util/logger/logger.go b/util/logger/logger.go index 48cc1e07d..4f2c19fc9 100644 --- a/util/logger/logger.go +++ b/util/logger/logger.go @@ -54,6 +54,7 @@ func getLoggersInst() *logger { conf.Levels["_pool"] = "debug" conf.Levels["_http"] = "debug" conf.Levels["_grpc"] = "debug" + conf.Levels["_zmq"] = "debug" conf.Levels["_firewall"] = "debug" globalInst = &logger{ config: conf, diff --git a/util/testsuite/testsuite.go b/util/testsuite/testsuite.go index 3042b12a1..a25acbe75 100644 --- a/util/testsuite/testsuite.go +++ b/util/testsuite/testsuite.go @@ -3,6 +3,7 @@ package testsuite import ( "encoding/hex" "math/rand" + "net" "testing" "time" @@ -866,3 +867,12 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) { trx.SetSignature(sig) trx.SetPublicKey(prv.PublicKey()) } + +func (*TestSuite) FindFreePort() int { + listener, _ := net.Listen("tcp", "localhost:0") + defer func() { + _ = listener.Close() + }() + + return listener.Addr().(*net.TCPAddr).Port +} diff --git a/www/zmq/block_info_publisher.go b/www/zmq/block_info_publisher.go new file mode 100644 index 000000000..3b39b8248 --- /dev/null +++ b/www/zmq/block_info_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type blockInfoPub struct { + basePub +} + +func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &blockInfoPub{ + basePub: basePub{ + topic: BlockInfo, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*blockInfoPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/config.go b/www/zmq/config.go new file mode 100644 index 000000000..2fd96a0e4 --- /dev/null +++ b/www/zmq/config.go @@ -0,0 +1,92 @@ +package zmq + +import ( + "errors" + "fmt" + "net/url" + "strings" + "time" +) + +type Config struct { + ZmqPubBlockInfo string `toml:"zmqpubblockinfo"` + ZmqPubTxInfo string `toml:"zmqpubtxinfo"` + ZmqPubRawBlock string `toml:"zmqpubrawblock"` + ZmqPubRawTx string `toml:"zmqpubrawtx"` + ZmqPubHWM int `toml:"zmqpubhwm"` + + // Private config + ZmqAutomaticReconnect bool `toml:"-"` + ZmqDialerRetryTime time.Duration `toml:"-"` + ZmqDialerMaxRetries int `toml:"-"` +} + +func DefaultConfig() *Config { + return &Config{ + ZmqAutomaticReconnect: true, + ZmqDialerMaxRetries: 10, + ZmqDialerRetryTime: 250 * time.Millisecond, + ZmqPubHWM: 1000, + } +} + +func (c *Config) BasicCheck() error { + if c.ZmqPubBlockInfo != "" { + if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil { + return err + } + } + + if c.ZmqPubTxInfo != "" { + if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil { + return err + } + } + + if c.ZmqPubRawBlock != "" { + if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil { + return err + } + } + + if c.ZmqPubRawTx != "" { + if err := validateTopicSocket(c.ZmqPubRawTx); err != nil { + return err + } + } + + if c.ZmqPubHWM < 0 { + return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM) + } + + return nil +} + +func validateTopicSocket(socket string) error { + addr, err := url.Parse(socket) + if err != nil { + return errors.New("failed to parse ZmqPub value: " + err.Error()) + } + + if addr.Scheme != "tcp" { + return errors.New("invalid scheme: zeromq socket schema") + } + + if addr.Host == "" { + return errors.New("invalid host: host is empty") + } + + parts := strings.Split(addr.Host, ":") + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return errors.New("invalid host: missing or malformed host/port") + } + + port := parts[1] + for _, r := range port { + if r < '0' || r > '9' { + return errors.New("invalid port: non-numeric characters detected") + } + } + + return nil +} diff --git a/www/zmq/config_test.go b/www/zmq/config_test.go new file mode 100644 index 000000000..26c2770be --- /dev/null +++ b/www/zmq/config_test.go @@ -0,0 +1,82 @@ +package zmq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfig() + + assert.NotNil(t, cfg, "DefaultConfig should not return nil") + assert.Equal(t, "", cfg.ZmqPubBlockInfo, "ZmqPubBlockInfo should be empty") + assert.Equal(t, "", cfg.ZmqPubTxInfo, "ZmqPubTxInfo should be empty") + assert.Equal(t, "", cfg.ZmqPubRawBlock, "ZmqPubRawBlock should be empty") + assert.Equal(t, "", cfg.ZmqPubRawTx, "ZmqPubRawTx should be empty") + assert.Equal(t, 1000, cfg.ZmqPubHWM, "ZmqPubHWM should default to 1000") +} + +func TestBasicCheck(t *testing.T) { + testCases := []struct { + name string + config *Config + expectErr bool + }{ + { + name: "Valid configuration", + config: &Config{ + ZmqPubBlockInfo: "tcp://127.0.0.1:28332", + ZmqPubTxInfo: "tcp://127.0.0.1:28333", + ZmqPubRawBlock: "tcp://127.0.0.1:28334", + ZmqPubRawTx: "tcp://127.0.0.1:28335", + ZmqPubHWM: 1000, + }, + expectErr: false, + }, + { + name: "Invalid scheme", + config: &Config{ + ZmqPubBlockInfo: "udp://127.0.0.1:28332", + }, + expectErr: true, + }, + { + name: "Missing port", + config: &Config{ + ZmqPubBlockInfo: "tcp://127.0.0.1", + }, + expectErr: true, + }, + { + name: "Empty host", + config: &Config{ + ZmqPubBlockInfo: "tcp://:28332", + }, + expectErr: true, + }, + { + name: "Negative ZmqPubHWM", + config: &Config{ + ZmqPubHWM: -1, + }, + expectErr: true, + }, + { + name: "Empty configuration", + config: DefaultConfig(), + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.config.BasicCheck() + if tc.expectErr { + assert.Error(t, err, "BasicCheck should return an error") + } else { + assert.NoError(t, err, "BasicCheck should not return an error") + } + }) + } +} diff --git a/www/zmq/publisher.go b/www/zmq/publisher.go new file mode 100644 index 000000000..8999c86b4 --- /dev/null +++ b/www/zmq/publisher.go @@ -0,0 +1,28 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type Publisher interface { + Address() string + TopicName() string + + onNewBlock(blk *block.Block) +} + +type basePub struct { + topic Topic + zmqSocket zmq4.Socket + logger *logger.SubLogger +} + +func (b *basePub) Address() string { + return b.zmqSocket.Addr().String() +} + +func (b *basePub) TopicName() string { + return b.topic.String() +} diff --git a/www/zmq/publisher_test.go b/www/zmq/publisher_test.go new file mode 100644 index 000000000..47c1bb549 --- /dev/null +++ b/www/zmq/publisher_test.go @@ -0,0 +1,29 @@ +package zmq + +import ( + "context" + "fmt" + "net/url" + "testing" + + "github.com/go-zeromq/zmq4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBasePublisher(t *testing.T) { + suite := setup(t) + + topic := BlockInfo + addr, err := url.Parse(fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort())) + require.NoError(t, err) + + base := &basePub{ + topic: topic, + zmqSocket: zmq4.NewPub(context.TODO()), + } + + require.NoError(t, base.zmqSocket.Listen(addr.String())) + assert.Equal(t, base.Address(), addr.Host) + assert.Equal(t, base.TopicName(), topic.String()) +} diff --git a/www/zmq/raw_block_publisher.go b/www/zmq/raw_block_publisher.go new file mode 100644 index 000000000..6112afa30 --- /dev/null +++ b/www/zmq/raw_block_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type rawBlockPub struct { + basePub +} + +func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &rawBlockPub{ + basePub: basePub{ + topic: RawBlock, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*rawBlockPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/raw_tx_publisher.go b/www/zmq/raw_tx_publisher.go new file mode 100644 index 000000000..b6cfa53c7 --- /dev/null +++ b/www/zmq/raw_tx_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type rawTxPub struct { + basePub +} + +func newRawTxPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &rawTxPub{ + basePub: basePub{ + topic: RawTransaction, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*rawTxPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +} diff --git a/www/zmq/server.go b/www/zmq/server.go new file mode 100644 index 000000000..11e887afe --- /dev/null +++ b/www/zmq/server.go @@ -0,0 +1,120 @@ +package zmq + +import ( + "context" + + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type Server struct { + sockets map[string]zmq4.Socket + publishers []Publisher + config *Config + eventCh <-chan any + logger *logger.SubLogger +} + +func New(ctx context.Context, conf *Config, eventCh <-chan any) (*Server, error) { + svr := &Server{ + eventCh: eventCh, + logger: logger.NewSubLogger("_zmq", nil), + publishers: make([]Publisher, 0), + sockets: make(map[string]zmq4.Socket), + config: conf, + } + + publisherOpts := []zmq4.Option{ + zmq4.WithAutomaticReconnect(conf.ZmqAutomaticReconnect), + zmq4.WithDialerRetry(conf.ZmqDialerRetryTime), + zmq4.WithDialerMaxRetries(conf.ZmqDialerMaxRetries), + } + + makePublisher := func(addr string, newPublisher func(socket zmq4.Socket, logger *logger.SubLogger) Publisher) error { + if addr == "" { + return nil + } + + socket, ok := svr.sockets[addr] + if !ok { + socket = zmq4.NewPub(ctx, publisherOpts...) + + if err := socket.SetOption(zmq4.OptionHWM, conf.ZmqPubHWM); err != nil { + return err + } + + if err := socket.Listen(addr); err != nil { + return err + } + } + + pub := newPublisher(socket, svr.logger) + svr.publishers = append(svr.publishers, pub) + svr.sockets[addr] = socket + + svr.logger.Info("publisher initialized", "topic", pub.TopicName(), "socket", addr) + + return nil + } + + if err := makePublisher(conf.ZmqPubBlockInfo, newBlockInfoPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubTxInfo, newTxInfoPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubRawBlock, newRawBlockPub); err != nil { + return nil, err + } + + if err := makePublisher(conf.ZmqPubRawTx, newRawTxPub); err != nil { + return nil, err + } + + go svr.receivedEventLoop(ctx) + + return svr, nil +} + +func (s *Server) Publishers() []Publisher { + return s.publishers +} + +func (s *Server) HWM() int { + return s.config.ZmqPubHWM +} + +func (s *Server) Close() { + for _, sock := range s.sockets { + if err := sock.Close(); err != nil { + s.logger.Error("failed to close socket", "err", err) + } + } +} + +func (s *Server) receivedEventLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-s.eventCh: + if !ok { + s.logger.Warn("event channel closed") + + return + } + + switch ev := event.(type) { + case *block.Block: + for _, pub := range s.publishers { + pub.onNewBlock(ev) + } + default: + panic("invalid event type") + } + } + } +} diff --git a/www/zmq/server_test.go b/www/zmq/server_test.go new file mode 100644 index 000000000..b8ad269ac --- /dev/null +++ b/www/zmq/server_test.go @@ -0,0 +1,110 @@ +package zmq + +import ( + "context" + "fmt" + "testing" + + "github.com/pactus-project/pactus/state" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testData struct { + *testsuite.TestSuite + + mockState *state.MockState + server *Server + eventCh chan any +} + +func setup(t *testing.T) *testData { + t.Helper() + + ts := testsuite.NewTestSuite(t) + mockState := state.MockingState(ts) + + return &testData{ + TestSuite: ts, + mockState: mockState, + } +} + +func (ts *testData) initServer(ctx context.Context, conf *Config) error { + eventCh := make(chan any) + sv, err := New(ctx, conf, eventCh) + if err != nil { + return err + } + + ts.server = sv + ts.eventCh = eventCh + + return nil +} + +func (ts *testData) resetServer() { + ts.server = nil + ts.eventCh = nil +} + +func (ts *testData) cleanup() func() { + return func() { + ts.server.Close() + ts.resetServer() + } +} + +func TestServerWithDefaultConfig(t *testing.T) { + suite := setup(t) + + conf := DefaultConfig() + + err := suite.initServer(context.TODO(), conf) + t.Cleanup(suite.cleanup()) + + assert.NoError(t, err) + require.NotNil(t, suite.server) +} + +func TestTopicsWithSameSocket(t *testing.T) { + suite := setup(t) + t.Cleanup(suite.cleanup()) + + port := suite.FindFreePort() + addr := fmt.Sprintf("tcp://127.0.0.1:%d", port) + + conf := DefaultConfig() + conf.ZmqPubBlockInfo = addr + conf.ZmqPubTxInfo = addr + conf.ZmqPubRawBlock = addr + conf.ZmqPubRawTx = addr + + err := suite.initServer(context.TODO(), conf) + require.NoError(t, err) + + require.Len(t, suite.server.publishers, 4) + + expectedAddr := suite.server.publishers[0].Address() + + for _, pub := range suite.server.publishers { + require.Equal(t, expectedAddr, pub.Address(), "All publishers must have the same address") + } +} + +func TestTopicsWithDifferentSockets(t *testing.T) { + suite := setup(t) + t.Cleanup(suite.cleanup()) + + conf := DefaultConfig() + conf.ZmqPubBlockInfo = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubTxInfo = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubRawBlock = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + conf.ZmqPubRawTx = fmt.Sprintf("tcp://127.0.0.1:%d", suite.FindFreePort()) + + err := suite.initServer(context.TODO(), conf) + require.NoError(t, err) + + require.Len(t, suite.server.publishers, 4) +} diff --git a/www/zmq/topic.go b/www/zmq/topic.go new file mode 100644 index 000000000..fb2411e74 --- /dev/null +++ b/www/zmq/topic.go @@ -0,0 +1,29 @@ +package zmq + +type Topic int16 + +const ( + BlockInfo Topic = 0x0001 + TransactionInfo Topic = 0x0002 + RawBlock Topic = 0x0003 + RawTransaction Topic = 0x0004 +) + +func (t Topic) String() string { + switch t { + case BlockInfo: + return "block_info" + + case TransactionInfo: + return "transaction_info" + + case RawBlock: + return "raw_block" + + case RawTransaction: + return "raw_transaction" + + default: + return "" + } +} diff --git a/www/zmq/tx_info_publisher.go b/www/zmq/tx_info_publisher.go new file mode 100644 index 000000000..e9e0ab0e9 --- /dev/null +++ b/www/zmq/tx_info_publisher.go @@ -0,0 +1,26 @@ +package zmq + +import ( + "github.com/go-zeromq/zmq4" + "github.com/pactus-project/pactus/types/block" + "github.com/pactus-project/pactus/util/logger" +) + +type txInfoPub struct { + basePub +} + +func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { + return &txInfoPub{ + basePub: basePub{ + topic: TransactionInfo, + zmqSocket: socket, + logger: logger, + }, + } +} + +func (*txInfoPub) onNewBlock(_ *block.Block) { + // TODO implement me + panic("implement me") +}