From a217a24c696c392206a5f37debca2c9f03671ac1 Mon Sep 17 00:00:00 2001 From: Javad Date: Mon, 13 Jan 2025 17:35:31 +0330 Subject: [PATCH] feat: add block info publisher --- node/node.go | 3 +- www/zmq/block_info_publisher.go | 28 +++++++++-- www/zmq/block_info_publisher_test.go | 67 +++++++++++++++++++++++++ www/zmq/topic.go | 9 ++++ www/zmq/util.go | 30 +++++++++++ www/zmq/util_test.go | 74 ++++++++++++++++++++++++++++ 6 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 www/zmq/block_info_publisher_test.go create mode 100644 www/zmq/util.go create mode 100644 www/zmq/util_test.go diff --git a/node/node.go b/node/node.go index 50561d2bf..ae8e4fc5d 100644 --- a/node/node.go +++ b/node/node.go @@ -167,7 +167,6 @@ func (n *Node) Stop() { // Wait for network to stop time.Sleep(1 * time.Second) - close(n.eventCh) n.consMgr.Stop() n.sync.Stop() n.state.Close() @@ -176,6 +175,8 @@ func (n *Node) Stop() { n.http.StopServer() n.jsonrpc.StopServer() n.zeromq.Close() + + close(n.eventCh) } // these methods are using by GUI. diff --git a/www/zmq/block_info_publisher.go b/www/zmq/block_info_publisher.go index 3b39b8248..5e6e4d5a7 100644 --- a/www/zmq/block_info_publisher.go +++ b/www/zmq/block_info_publisher.go @@ -7,6 +7,7 @@ import ( ) type blockInfoPub struct { + seqNo uint32 basePub } @@ -20,7 +21,28 @@ func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher { } } -func (*blockInfoPub) onNewBlock(_ *block.Block) { - // TODO implement me - panic("implement me") +func (b *blockInfoPub) onNewBlock(blk *block.Block) { + seq := b.seqNo + 1 + + rawMsg := makeTopicMsg( + b.topic, + blk.Header().ProposerAddress(), + blk.Header().UnixTime(), + uint16(len(blk.Transactions())), + blk.Height(), + seq, + ) + + message := zmq4.NewMsg(rawMsg) + + if err := b.zmqSocket.Send(message); err != nil { + b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName()) + } + + b.logger.Debug("zmq published message success", + "publisher", b.TopicName(), + "block_height", blk.Height(), + ) + + b.seqNo = seq } diff --git a/www/zmq/block_info_publisher_test.go b/www/zmq/block_info_publisher_test.go new file mode 100644 index 000000000..dd2e3f6a4 --- /dev/null +++ b/www/zmq/block_info_publisher_test.go @@ -0,0 +1,67 @@ +package zmq + +import ( + "context" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/go-zeromq/zmq4" + "github.com/stretchr/testify/require" +) + +func TestBlockInfoPublisher(t *testing.T) { + td := setup(t) + t.Cleanup(td.cleanup()) + + port := td.FindFreePort() + addr := fmt.Sprintf("tcp://localhost:%d", port) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := DefaultConfig() + conf.ZmqPubBlockInfo = addr + + err := td.initServer(ctx, conf) + require.NoError(t, err) + + sub := zmq4.NewSub(ctx) + defer func() { + _ = sub.Close() + }() + + err = sub.Dial(addr) + require.NoError(t, err) + + err = sub.SetOption(zmq4.OptionSubscribe, string(BlockInfo.Bytes())) + require.NoError(t, err) + + blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight()) + + td.eventCh <- blk + + received, err := sub.Recv() + require.NoError(t, err) + + require.NotNil(t, received.Frames) + require.GreaterOrEqual(t, len(received.Frames), 1) + + msg := received.Frames[0] + require.Len(t, msg, 37) + + topic := msg[:2] + proposerBytes := msg[2:23] + timestamp := binary.BigEndian.Uint32(msg[23:27]) + txCount := binary.BigEndian.Uint16(msg[27:29]) + height := binary.BigEndian.Uint32(msg[29:33]) + seqNo := binary.BigEndian.Uint32(msg[33:]) + + require.Equal(t, BlockInfo.Bytes(), topic) + require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes) + require.Equal(t, blk.Header().UnixTime(), timestamp) + require.Equal(t, uint16(len(blk.Transactions())), txCount) + require.Equal(t, blk.Height(), height) + require.Equal(t, uint32(1), seqNo) +} diff --git a/www/zmq/topic.go b/www/zmq/topic.go index fb2411e74..f9c66b145 100644 --- a/www/zmq/topic.go +++ b/www/zmq/topic.go @@ -1,5 +1,7 @@ package zmq +import "encoding/binary" + type Topic int16 const ( @@ -27,3 +29,10 @@ func (t Topic) String() string { return "" } } + +func (t Topic) Bytes() []byte { + b := make([]byte, 2) + binary.BigEndian.PutUint16(b, uint16(t)) + + return b +} diff --git a/www/zmq/util.go b/www/zmq/util.go new file mode 100644 index 000000000..f64832a6a --- /dev/null +++ b/www/zmq/util.go @@ -0,0 +1,30 @@ +package zmq + +import ( + "encoding/binary" + + "github.com/pactus-project/pactus/crypto" +) + +func makeTopicMsg(parts ...any) []byte { + result := make([]byte, 0, 64) + + for _, part := range parts { + switch castedVal := part.(type) { + case crypto.Address: + result = append(result, castedVal.Bytes()...) + case Topic: + result = append(result, castedVal.Bytes()...) + case []byte: + result = append(result, castedVal...) + case uint32: + result = binary.BigEndian.AppendUint32(result, castedVal) + case uint16: + result = binary.BigEndian.AppendUint16(result, castedVal) + default: + panic("implement me!!") + } + } + + return result +} diff --git a/www/zmq/util_test.go b/www/zmq/util_test.go new file mode 100644 index 000000000..08e4ef470 --- /dev/null +++ b/www/zmq/util_test.go @@ -0,0 +1,74 @@ +package zmq + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMakeTopicMsg(t *testing.T) { + td := setup(t) + + _, addr := td.TestSuite.GenerateTestAccount() + randHeight := td.TestSuite.RandHeight() + + tests := []struct { + name string + parts []any + want []byte + wantPanic bool + }{ + { + name: "single crypto.Address", + parts: []any{addr}, + want: addr.Bytes(), + }, + { + name: "single Topic", + parts: []any{BlockInfo}, + want: BlockInfo.Bytes(), + }, + { + name: "uint32 value", + parts: []any{randHeight}, + want: binary.BigEndian.AppendUint32([]byte{}, randHeight), + }, + { + name: "uint16 value", + parts: []any{uint16(0x0506)}, + want: binary.BigEndian.AppendUint16([]byte{}, 0x0506), + }, + { + name: "multiple types", + parts: []any{addr, BlockInfo, uint32(0x0A0B0C0D), []byte{0x0E}}, + want: func() []byte { + b := addr.Bytes() + b = append(b, BlockInfo.Bytes()...) + b = binary.BigEndian.AppendUint32(b, 0x0A0B0C0D) + b = append(b, 0x0E) + + return b + }(), + }, + { + name: "unknown type", + parts: []any{"unknown"}, + wantPanic: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.wantPanic { + assert.Panics(t, func() { + makeTopicMsg(tt.parts...) + }) + + return + } + got := makeTopicMsg(tt.parts...) + assert.Equal(t, tt.want, got) + }) + } +}