Skip to content

Commit

Permalink
feat(zeromq): add block info publisher (#1666)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Jan 16, 2025
1 parent d0e66e5 commit 10d6a0d
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 158 deletions.
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func (n *Node) Stop() {
// Wait for network to stop
time.Sleep(1 * time.Second)

close(n.eventCh)
n.consMgr.Stop()
n.sync.Stop()
n.state.Close()
Expand All @@ -176,6 +175,8 @@ func (n *Node) Stop() {
n.http.StopServer()
n.jsonrpc.StopServer()
n.zeromq.Close()

close(n.eventCh)
}

// these methods are using by GUI.
Expand Down
5 changes: 4 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,5 +752,8 @@ func (st *state) publishEvent(msg any) {
return
}

st.eventCh <- msg
select {
case st.eventCh <- msg:
default:
}
}
2 changes: 1 addition & 1 deletion util/testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {
trx.SetPublicKey(prv.PublicKey())
}

func (*TestSuite) FindFreePort() int {
func FindFreePort() int {
listener, _ := net.Listen("tcp", "localhost:0")
defer func() {
_ = listener.Close()
Expand Down
26 changes: 0 additions & 26 deletions www/zmq/block_info_publisher.go

This file was deleted.

17 changes: 6 additions & 11 deletions www/zmq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/url"
"strings"
"time"
)

type Config struct {
Expand All @@ -14,19 +13,15 @@ type Config struct {
ZmqPubRawBlock string `toml:"zmqpubrawblock"`
ZmqPubRawTx string `toml:"zmqpubrawtx"`
ZmqPubHWM int `toml:"zmqpubhwm"`

// Private config
ZmqAutomaticReconnect bool `toml:"-"`
ZmqDialerRetryTime time.Duration `toml:"-"`
ZmqDialerMaxRetries int `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
ZmqAutomaticReconnect: true,
ZmqDialerMaxRetries: 10,
ZmqDialerRetryTime: 250 * time.Millisecond,
ZmqPubHWM: 1000,
ZmqPubBlockInfo: "",
ZmqPubTxInfo: "",
ZmqPubRawBlock: "",
ZmqPubRawTx: "",
ZmqPubHWM: 1000,
}
}

Expand Down Expand Up @@ -65,7 +60,7 @@ func (c *Config) BasicCheck() error {
func validateTopicSocket(socket string) error {
addr, err := url.Parse(socket)
if err != nil {
return errors.New("failed to parse ZmqPub value: " + err.Error())
return errors.New("failed to parse URL: " + err.Error())
}

if addr.Scheme != "tcp" {
Expand Down
12 changes: 6 additions & 6 deletions www/zmq/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestDefaultConfig(t *testing.T) {
}

func TestBasicCheck(t *testing.T) {
testCases := []struct {
tests := []struct {
name string
config *Config
expectErr bool
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestBasicCheck(t *testing.T) {
{
name: "Empty host",
config: &Config{
ZmqPubBlockInfo: "tcp://:28332",
ZmqPubTxInfo: "tcp://:28332",
},
expectErr: true,
},
Expand All @@ -69,10 +69,10 @@ func TestBasicCheck(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.config.BasicCheck()
if tc.expectErr {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.BasicCheck()
if tt.expectErr {
assert.Error(t, err, "BasicCheck should return an error")
} else {
assert.NoError(t, err, "BasicCheck should not return an error")
Expand Down
44 changes: 44 additions & 0 deletions www/zmq/publisher.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package zmq

import (
"encoding/binary"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type Publisher interface {
Address() string
TopicName() string
HWM() int

onNewBlock(blk *block.Block)
}

type basePub struct {
topic Topic
seqNo uint32
zmqSocket zmq4.Socket
logger *logger.SubLogger
}
Expand All @@ -26,3 +31,42 @@ func (b *basePub) Address() string {
func (b *basePub) TopicName() string {
return b.topic.String()
}

func (b *basePub) HWM() int {
hwmOpt, _ := b.zmqSocket.GetOption(zmq4.OptionHWM)

return hwmOpt.(int)
}

// makeTopicMsg constructs a ZMQ message with a topic ID, message body, and sequence number.
// The message is constructed as a byte slice with the following structure:
// - Topic ID (2 Bytes)
// - Message body (varies based on provided parts)
// - Sequence number (4 Bytes).
func (b *basePub) makeTopicMsg(parts ...any) []byte {
result := make([]byte, 0, 64)

// Append Topic ID to the message (2 Bytes)
result = append(result, b.topic.Bytes()...)

// Append message body based on the provided parts
for _, part := range parts {
switch castedVal := part.(type) {
case crypto.Address:
result = append(result, castedVal.Bytes()...)
case []byte:
result = append(result, castedVal...)
case uint32:
result = binary.BigEndian.AppendUint32(result, castedVal)
case uint16:
result = binary.BigEndian.AppendUint16(result, castedVal)
default:
panic("implement me!!")
}
}

// Append sequence number to the message (4 Bytes, Big Endian encoding)
result = binary.BigEndian.AppendUint32(result, b.seqNo)

return result
}
43 changes: 43 additions & 0 deletions www/zmq/publisher_block_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package zmq

import (
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type blockInfoPub struct {
basePub
}

func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &blockInfoPub{
basePub: basePub{
topic: TopicBlockInfo,
seqNo: 0,
zmqSocket: socket,
logger: logger,
},
}
}

func (b *blockInfoPub) onNewBlock(blk *block.Block) {
rawMsg := b.makeTopicMsg(
blk.Header().ProposerAddress(),
blk.Header().UnixTime(),
uint16(len(blk.Transactions())),
blk.Height(),
)

message := zmq4.NewMsg(rawMsg)

if err := b.zmqSocket.Send(message); err != nil {
b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName())
}

b.logger.Debug("zmq published message success",
"publisher", b.TopicName(),
"block_height", blk.Height())

b.seqNo++
}
61 changes: 61 additions & 0 deletions www/zmq/publisher_block_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package zmq

import (
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/require"
)

func TestBlockInfoPublisher(t *testing.T) {
port := testsuite.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)
conf := DefaultConfig()
conf.ZmqPubBlockInfo = addr

td := setup(t, conf)
defer td.closeServer()

td.server.Publishers()

sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))

err := sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicBlockInfo.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]
require.Len(t, msg, 37)

topic := msg[:2]
proposerBytes := msg[2:23]
timestamp := binary.BigEndian.Uint32(msg[23:27])
txCount := binary.BigEndian.Uint16(msg[27:29])
height := binary.BigEndian.Uint32(msg[29:33])
seqNo := binary.BigEndian.Uint32(msg[33:])

require.Equal(t, TopicBlockInfo.Bytes(), topic)
require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes)
require.Equal(t, blk.Header().UnixTime(), timestamp)
require.Equal(t, uint16(len(blk.Transactions())), txCount)
require.Equal(t, blk.Height(), height)
require.Equal(t, uint32(0), seqNo)

require.NoError(t, sub.Close())
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type rawBlockPub struct {
func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &rawBlockPub{
basePub: basePub{
topic: RawBlock,
topic: TopicRawBlock,
zmqSocket: socket,
logger: logger,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type rawTxPub struct {
func newRawTxPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &rawTxPub{
basePub: basePub{
topic: RawTransaction,
topic: TopicRawTransaction,
zmqSocket: socket,
logger: logger,
},
Expand Down
29 changes: 0 additions & 29 deletions www/zmq/publisher_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type txInfoPub struct {
func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &txInfoPub{
basePub: basePub{
topic: TransactionInfo,
topic: TopicTransactionInfo,
zmqSocket: socket,
logger: logger,
},
Expand Down
Loading

0 comments on commit 10d6a0d

Please sign in to comment.