Skip to content

Commit

Permalink
feat: add block info publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad committed Jan 13, 2025
1 parent 05441bd commit a217a24
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 4 deletions.
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func (n *Node) Stop() {
// Wait for network to stop
time.Sleep(1 * time.Second)

close(n.eventCh)
n.consMgr.Stop()
n.sync.Stop()
n.state.Close()
Expand All @@ -176,6 +175,8 @@ func (n *Node) Stop() {
n.http.StopServer()
n.jsonrpc.StopServer()
n.zeromq.Close()

close(n.eventCh)
}

// these methods are using by GUI.
Expand Down
28 changes: 25 additions & 3 deletions www/zmq/block_info_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

type blockInfoPub struct {
seqNo uint32
basePub
}

Expand All @@ -20,7 +21,28 @@ func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
}
}

func (*blockInfoPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")
func (b *blockInfoPub) onNewBlock(blk *block.Block) {
seq := b.seqNo + 1

rawMsg := makeTopicMsg(
b.topic,
blk.Header().ProposerAddress(),
blk.Header().UnixTime(),
uint16(len(blk.Transactions())),
blk.Height(),
seq,
)

message := zmq4.NewMsg(rawMsg)

if err := b.zmqSocket.Send(message); err != nil {
b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName())
}

Check warning on line 40 in www/zmq/block_info_publisher.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/block_info_publisher.go#L39-L40

Added lines #L39 - L40 were not covered by tests

b.logger.Debug("zmq published message success",
"publisher", b.TopicName(),
"block_height", blk.Height(),
)

b.seqNo = seq
}
67 changes: 67 additions & 0 deletions www/zmq/block_info_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package zmq

import (
"context"
"encoding/binary"
"fmt"
"testing"
"time"

"github.com/go-zeromq/zmq4"
"github.com/stretchr/testify/require"
)

func TestBlockInfoPublisher(t *testing.T) {
td := setup(t)
t.Cleanup(td.cleanup())

port := td.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conf := DefaultConfig()
conf.ZmqPubBlockInfo = addr

err := td.initServer(ctx, conf)
require.NoError(t, err)

sub := zmq4.NewSub(ctx)
defer func() {
_ = sub.Close()
}()

err = sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(BlockInfo.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]
require.Len(t, msg, 37)

topic := msg[:2]
proposerBytes := msg[2:23]
timestamp := binary.BigEndian.Uint32(msg[23:27])
txCount := binary.BigEndian.Uint16(msg[27:29])
height := binary.BigEndian.Uint32(msg[29:33])
seqNo := binary.BigEndian.Uint32(msg[33:])

require.Equal(t, BlockInfo.Bytes(), topic)
require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes)
require.Equal(t, blk.Header().UnixTime(), timestamp)
require.Equal(t, uint16(len(blk.Transactions())), txCount)
require.Equal(t, blk.Height(), height)
require.Equal(t, uint32(1), seqNo)
}
9 changes: 9 additions & 0 deletions www/zmq/topic.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zmq

import "encoding/binary"

type Topic int16

const (
Expand Down Expand Up @@ -27,3 +29,10 @@ func (t Topic) String() string {
return ""
}
}

func (t Topic) Bytes() []byte {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, uint16(t))

return b
}
30 changes: 30 additions & 0 deletions www/zmq/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package zmq

import (
"encoding/binary"

"github.com/pactus-project/pactus/crypto"
)

func makeTopicMsg(parts ...any) []byte {
result := make([]byte, 0, 64)

for _, part := range parts {
switch castedVal := part.(type) {
case crypto.Address:
result = append(result, castedVal.Bytes()...)
case Topic:
result = append(result, castedVal.Bytes()...)
case []byte:
result = append(result, castedVal...)
case uint32:
result = binary.BigEndian.AppendUint32(result, castedVal)
case uint16:
result = binary.BigEndian.AppendUint16(result, castedVal)
default:
panic("implement me!!")
}
}

return result
}
74 changes: 74 additions & 0 deletions www/zmq/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package zmq

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMakeTopicMsg(t *testing.T) {
td := setup(t)

_, addr := td.TestSuite.GenerateTestAccount()
randHeight := td.TestSuite.RandHeight()

tests := []struct {
name string
parts []any
want []byte
wantPanic bool
}{
{
name: "single crypto.Address",
parts: []any{addr},
want: addr.Bytes(),
},
{
name: "single Topic",
parts: []any{BlockInfo},
want: BlockInfo.Bytes(),
},
{
name: "uint32 value",
parts: []any{randHeight},
want: binary.BigEndian.AppendUint32([]byte{}, randHeight),
},
{
name: "uint16 value",
parts: []any{uint16(0x0506)},
want: binary.BigEndian.AppendUint16([]byte{}, 0x0506),
},
{
name: "multiple types",
parts: []any{addr, BlockInfo, uint32(0x0A0B0C0D), []byte{0x0E}},
want: func() []byte {
b := addr.Bytes()
b = append(b, BlockInfo.Bytes()...)
b = binary.BigEndian.AppendUint32(b, 0x0A0B0C0D)
b = append(b, 0x0E)

return b
}(),
},
{
name: "unknown type",
parts: []any{"unknown"},
wantPanic: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.wantPanic {
assert.Panics(t, func() {
makeTopicMsg(tt.parts...)
})

return
}
got := makeTopicMsg(tt.parts...)
assert.Equal(t, tt.want, got)
})
}
}

0 comments on commit a217a24

Please sign in to comment.