From 7156123ae4d71a30da29c89d5e0c3820adc721f2 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 11 Jan 2025 09:25:01 +0330 Subject: [PATCH 1/2] chore: remove nanomsg --- .github/workflows/semantic-pr.yml | 1 - config/config.go | 12 ---- config/config_test.go | 8 --- config/example_config.toml | 10 ---- consensus/consensus_test.go | 10 ++-- go.mod | 2 - go.sum | 7 --- node/node.go | 17 +----- node/node_test.go | 2 - state/state.go | 33 +---------- state/state_test.go | 4 +- www/nanomsg/config.go | 18 ------ www/nanomsg/event/event.go | 58 ------------------ www/nanomsg/event/event_test.go | 41 ------------- www/nanomsg/server.go | 98 ------------------------------- 15 files changed, 9 insertions(+), 312 deletions(-) delete mode 100644 www/nanomsg/config.go delete mode 100644 www/nanomsg/event/event.go delete mode 100644 www/nanomsg/event/event_test.go delete mode 100644 www/nanomsg/server.go diff --git a/.github/workflows/semantic-pr.yml b/.github/workflows/semantic-pr.yml index 6c65462ff..c9d8a4b23 100644 --- a/.github/workflows/semantic-pr.yml +++ b/.github/workflows/semantic-pr.yml @@ -59,7 +59,6 @@ jobs: proto http jsonrpc - nanomsg zeromq windows linux diff --git a/config/config.go b/config/config.go index e6bb64a73..fff0f672b 100644 --- a/config/config.go +++ b/config/config.go @@ -19,7 +19,6 @@ import ( "github.com/pactus-project/pactus/www/grpc" "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" - "github.com/pactus-project/pactus/www/nanomsg" "github.com/pactus-project/pactus/www/zmq" "github.com/pelletier/go-toml/v2" ) @@ -47,7 +46,6 @@ type Config struct { JSONRPC *jsonrpc.Config `toml:"jsonrpc"` HTTP *http.Config `toml:"http"` WalletManager *wallet.Config `toml:"-"` - Nanomsg *nanomsg.Config `toml:"nanomsg"` ZeroMq *zmq.Config `toml:"zeromq"` } @@ -100,7 +98,6 @@ func defaultConfig() *Config { GRPC: grpc.DefaultConfig(), JSONRPC: jsonrpc.DefaultConfig(), HTTP: http.DefaultConfig(), - Nanomsg: nanomsg.DefaultConfig(), ZeroMq: zmq.DefaultConfig(), WalletManager: wallet.DefaultConfig(), } @@ -156,8 +153,6 @@ func DefaultConfigMainnet() *Config { conf.HTTP.Enable = false conf.HTTP.Listen = "127.0.0.1:80" conf.HTTP.EnablePprof = false - conf.Nanomsg.Enable = false - conf.Nanomsg.Listen = "tcp://127.0.0.1:40899" return conf } @@ -192,8 +187,6 @@ func DefaultConfigTestnet() *Config { conf.HTTP.Enable = false conf.HTTP.Listen = "[::]:80" conf.HTTP.EnablePprof = false - conf.Nanomsg.Enable = false - conf.Nanomsg.Listen = "tcp://[::]:40799" return conf } @@ -220,8 +213,6 @@ func DefaultConfigLocalnet() *Config { conf.HTTP.Enable = true conf.HTTP.Listen = "[::]:0" conf.HTTP.EnablePprof = true - conf.Nanomsg.Enable = true - conf.Nanomsg.Listen = "tcp://[::]:40799" conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332" conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333" conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334" @@ -295,9 +286,6 @@ func (conf *Config) BasicCheck() error { if err := conf.Sync.BasicCheck(); err != nil { return err } - if err := conf.Nanomsg.BasicCheck(); err != nil { - return err - } if err := conf.JSONRPC.BasicCheck(); err != nil { return err } diff --git a/config/config_test.go b/config/config_test.go index 3315d3750..c1596974c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -48,12 +48,10 @@ func TestDefaultConfig(t *testing.T) { assert.False(t, conf.GRPC.Enable) assert.False(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Zero(t, conf.GRPC.Listen) assert.Zero(t, conf.GRPC.Gateway.Listen) assert.Zero(t, conf.HTTP.Listen) - assert.Zero(t, conf.Nanomsg.Listen) } func TestMainnetConfig(t *testing.T) { @@ -67,12 +65,10 @@ func TestMainnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.False(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Equal(t, "127.0.0.1:50051", conf.GRPC.Listen) assert.Equal(t, "127.0.0.1:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "127.0.0.1:80", conf.HTTP.Listen) - assert.Equal(t, "tcp://127.0.0.1:40899", conf.Nanomsg.Listen) } func TestTestnetConfig(t *testing.T) { @@ -86,12 +82,10 @@ func TestTestnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.True(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Equal(t, "[::]:50052", conf.GRPC.Listen) assert.Equal(t, "[::]:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "[::]:80", conf.HTTP.Listen) - assert.Equal(t, "tcp://[::]:40799", conf.Nanomsg.Listen) } func TestLocalnetConfig(t *testing.T) { @@ -105,12 +99,10 @@ func TestLocalnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.True(t, conf.GRPC.Gateway.Enable) assert.True(t, conf.HTTP.Enable) - assert.True(t, conf.Nanomsg.Enable) assert.Equal(t, "[::]:50052", conf.GRPC.Listen) assert.Equal(t, "[::]:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "[::]:0", conf.HTTP.Listen) - assert.Equal(t, "tcp://[::]:40799", conf.Nanomsg.Listen) } func TestLoadFromFile(t *testing.T) { diff --git a/config/example_config.toml b/config/example_config.toml index f01b25dc6..f04cb5cb0 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -238,16 +238,6 @@ # data, so enable only in secure environments with restricted access. enable_pprof = false -# Nanomsg configuration. -[nanomsg] - - # `enable` indicates whether nanomsg service should be enabled or not. - # Default is `false`. - enable = false - - # `listen` is the address for incoming connections to the nanomsg server. - listen = 'tcp://127.0.0.1:40899' - # ZeroMQ configuration. [zeromq] diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 0d8adae12..fcd5bf1a4 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -94,16 +94,16 @@ func setupWithSeed(t *testing.T, seed int64) *testData { Add(time.Duration(params.BlockIntervalInSecond) * time.Second) genDoc := genesis.MakeGenesis(getTime, accs, vals, params) stateX, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexX]}, - store.MockingStore(ts), txPool, nil) + store.MockingStore(ts), txPool) require.NoError(t, err) stateY, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexY]}, - store.MockingStore(ts), txPool, nil) + store.MockingStore(ts), txPool) require.NoError(t, err) stateB, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexB]}, - store.MockingStore(ts), txPool, nil) + store.MockingStore(ts), txPool) require.NoError(t, err) stateP, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexP]}, - store.MockingStore(ts), txPool, nil) + store.MockingStore(ts), txPool) require.NoError(t, err) consMessages := make([]consMessage, 0) @@ -451,7 +451,7 @@ func TestNotInCommittee(t *testing.T) { valKey := td.RandValKey() store := store.MockingStore(td.TestSuite) - state, _ := state.LoadOrNewState(td.genDoc, []*bls.ValidatorKey{valKey}, store, td.txPool, nil) + state, _ := state.LoadOrNewState(td.genDoc, []*bls.ValidatorKey{valKey}, store, td.txPool) consInt := NewConsensus(testConfig(), state, valKey, valKey.Address(), make(chan message.Message, 100), newConcreteMediator()) cons := consInt.(*consensus) diff --git a/go.mod b/go.mod index 7ef1a78e6..44d8610b3 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/stretchr/testify v1.10.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/tyler-smith/go-bip39 v1.1.0 - go.nanomsg.org/mangos/v3 v3.4.2 golang.org/x/crypto v0.31.0 golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 golang.org/x/term v0.27.0 @@ -47,7 +46,6 @@ require ( ) require ( - github.com/Microsoft/go-winio v0.6.2 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect diff --git a/go.sum b/go.sum index a63aa3964..7b3bb0260 100644 --- a/go.sum +++ b/go.sum @@ -8,9 +8,6 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1 dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= -github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= -github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/NathanBaulch/protoc-gen-cobra v1.2.1 h1:BOqX9glwicbqDJDGndMnhHhx8psGTSjGdZzRDY1a7A8= github.com/NathanBaulch/protoc-gen-cobra v1.2.1/go.mod h1:ZLPLEPQgV3jP3a7IEp+xxYPk8tF4lhY9ViV0hn6K3iA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= @@ -101,7 +98,6 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= @@ -186,7 +182,6 @@ github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyE github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotk3/gotk3 v0.6.2 h1:sx/PjaKfKULJPTPq8p2kn2ZbcNFxpOJqi4VLzMbEOO8= @@ -573,8 +568,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.nanomsg.org/mangos/v3 v3.4.2 h1:gHlopxjWvJcVCcUilQIsRQk9jdj6/HB7wrTiUN8Ki7Q= -go.nanomsg.org/mangos/v3 v3.4.2/go.mod h1:8+hjBMQub6HvXmuGvIq6hf19uxGQIjCofmc62lbedLA= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= diff --git a/node/node.go b/node/node.go index 4c54525cc..91548fafb 100644 --- a/node/node.go +++ b/node/node.go @@ -21,8 +21,6 @@ import ( "github.com/pactus-project/pactus/www/grpc" "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" - "github.com/pactus-project/pactus/www/nanomsg" - "github.com/pactus-project/pactus/www/nanomsg/event" "github.com/pkg/errors" ) @@ -38,7 +36,6 @@ type Node struct { http *http.Server grpc *grpc.Server jsonrpc *jsonrpc.Server - nanomsg *nanomsg.Server } func NewNode(genDoc *genesis.Genesis, conf *config.Config, @@ -54,10 +51,6 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, "network", chainType) messageCh := make(chan message.Message, 500) - eventCh := make(chan event.Event, 500) - if !conf.Nanomsg.Enable { - eventCh = nil - } store, err := store.NewStore(conf.Store) if err != nil { @@ -66,7 +59,7 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, txPool := txpool.NewTxPool(conf.TxPool, store, messageCh) - state, err := state.LoadOrNewState(genDoc, valKeys, store, txPool, eventCh) + state, err := state.LoadOrNewState(genDoc, valKeys, store, txPool) if err != nil { return nil, err } @@ -94,7 +87,6 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, grpcServer := grpc.NewServer(conf.GRPC, state, syn, net, consMgr, walletMgr) httpServer := http.NewServer(conf.HTTP, enableHTTPAuth) jsonrpcServer := jsonrpc.NewServer(conf.JSONRPC) - nanomsgServer := nanomsg.NewServer(conf.Nanomsg, eventCh) node := &Node{ config: conf, @@ -108,7 +100,6 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, http: httpServer, grpc: grpcServer, jsonrpc: jsonrpcServer, - nanomsg: nanomsgServer, } return node, nil @@ -152,11 +143,6 @@ func (n *Node) Start() error { return errors.Wrap(err, "could not start JSON-RPC server") } - err = n.nanomsg.StartServer() - if err != nil { - return errors.Wrap(err, "could not start Nanomsg server") - } - return nil } @@ -175,7 +161,6 @@ func (n *Node) Stop() { n.grpc.StopServer() n.http.StopServer() n.jsonrpc.StopServer() - n.nanomsg.StopServer() } // these methods are using by GUI. diff --git a/node/node_test.go b/node/node_test.go index 40343064a..6dde27471 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -39,8 +39,6 @@ func TestRunningNode(t *testing.T) { conf.HTTP.Listen = "0.0.0.0:0" conf.JSONRPC.Enable = true conf.JSONRPC.Listen = "0.0.0.0:0" - conf.Nanomsg.Enable = true - conf.Nanomsg.Listen = "tcp://0.0.0.0:0" conf.Store.Path = util.TempDirPath() conf.Network.EnableRelay = false conf.Network.NetworkKey = util.TempFilePath() diff --git a/state/state.go b/state/state.go index 28aa678a6..e0503262e 100644 --- a/state/state.go +++ b/state/state.go @@ -31,7 +31,6 @@ import ( "github.com/pactus-project/pactus/util/logger" "github.com/pactus-project/pactus/util/persistentmerkle" "github.com/pactus-project/pactus/util/simplemerkle" - "github.com/pactus-project/pactus/www/nanomsg/event" ) type state struct { @@ -49,14 +48,13 @@ type state struct { validatorMerkle *persistentmerkle.Tree scoreMgr *score.Manager logger *logger.SubLogger - eventCh chan event.Event } func LoadOrNewState( genDoc *genesis.Genesis, valKeys []*bls.ValidatorKey, store store.Store, - txPool txpool.TxPool, eventCh chan event.Event, + txPool txpool.TxPool, ) (Facade, error) { state := &state{ valKeys: valKeys, @@ -67,7 +65,6 @@ func LoadOrNewState( lastInfo: lastinfo.NewLastInfo(), accountMerkle: persistentmerkle.New(), validatorMerkle: persistentmerkle.New(), - eventCh: eventCh, } state.logger = logger.NewSubLogger("_state", state) state.store = store @@ -459,10 +456,6 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat } } - // ----------------------------------- - // Publishing the events to the nano message. - st.publishEvents(height, blk) - return nil } @@ -716,30 +709,6 @@ func (st *state) Params() *param.Params { return st.params } -// publishEvents publishes block related events. -func (st *state) publishEvents(height uint32, blk *block.Block) { - if st.eventCh == nil { - return - } - blockEvent := event.CreateBlockEvent(blk.Hash(), height) - st.eventCh <- blockEvent - - for i := 1; i < blk.Transactions().Len(); i++ { - transaction := blk.Transactions().Get(i) - - senderChangeEvent := event.CreateAccountChangeEvent(transaction.Payload().Signer(), height) - st.eventCh <- senderChangeEvent - - if transaction.Payload().Receiver() != nil { - receiverChangeEvent := event.CreateAccountChangeEvent(*transaction.Payload().Receiver(), height) - st.eventCh <- receiverChangeEvent - } - - txEvent := event.CreateTransactionEvent(transaction.ID(), height) - st.eventCh <- txEvent - } -} - func (st *state) CalculateFee(amt amount.Amount, payloadType payload.Type) amount.Amount { return st.txPool.EstimatedFee(amt, payloadType) } diff --git a/state/state_test.go b/state/state_test.go index 0e701c838..6b6eb7b6d 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -74,7 +74,7 @@ func setup(t *testing.T) *testData { // First validator is in the committee valKeys := []*bls.ValidatorKey{genValKeys[0], ts.RandValKey()} - st1, err := LoadOrNewState(gnDoc, valKeys, mockStore, mockTxPool, nil) + st1, err := LoadOrNewState(gnDoc, valKeys, mockStore, mockTxPool) require.NoError(t, err) state, _ := st1.(*state) @@ -539,7 +539,7 @@ func TestLoadState(t *testing.T) { // Load last state info newState, err := LoadOrNewState(td.state.genDoc, td.state.valKeys, - td.state.store, td.commonTxPool, nil) + td.state.store, td.commonTxPool) require.NoError(t, err) assert.Equal(t, td.state.TotalAccounts(), newState.TotalAccounts()) diff --git a/www/nanomsg/config.go b/www/nanomsg/config.go deleted file mode 100644 index 50c316844..000000000 --- a/www/nanomsg/config.go +++ /dev/null @@ -1,18 +0,0 @@ -package nanomsg - -type Config struct { - Enable bool `toml:"enable"` - Listen string `toml:"listen"` -} - -func DefaultConfig() *Config { - return &Config{ - Enable: false, - Listen: "", - } -} - -// BasicCheck performs basic checks on the configuration. -func (*Config) BasicCheck() error { - return nil -} diff --git a/www/nanomsg/event/event.go b/www/nanomsg/event/event.go deleted file mode 100644 index 45ee601cb..000000000 --- a/www/nanomsg/event/event.go +++ /dev/null @@ -1,58 +0,0 @@ -package event - -import ( - "bytes" - - "github.com/pactus-project/pactus/crypto" - "github.com/pactus-project/pactus/crypto/hash" - "github.com/pactus-project/pactus/types/tx" - "github.com/pactus-project/pactus/util/encoding" - "github.com/pactus-project/pactus/util/logger" -) - -const ( - TopicBlock = uint16(0x0101) - TopicTransaction = uint16(0x0201) - TopicAccountChange = uint16(0x0301) -) - -type Event []byte - -// CreateBlockEvent creates an event when the new block is committed. -// The block event structure is like : -// . -func CreateBlockEvent(blockHash hash.Hash, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicBlock, blockHash, height) - if err != nil { - logger.Error("error on encoding event in new block", "error", err) - } - - return buf.Bytes() -} - -// CreateTransactionEvent creates an event when a new transaction sent. -// The new transaction event structure is like : -// . -func CreateTransactionEvent(txHash tx.ID, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicTransaction, txHash, height) - if err != nil { - logger.Error("error on encoding event in new transaction", "error", err) - } - - return buf.Bytes() -} - -// CreateAccountChangeEvent creates an event when the new account is created. -// The account event structure is like : -// . -func CreateAccountChangeEvent(accountAddr crypto.Address, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicAccountChange, accountAddr, height) - if err != nil { - logger.Error("error on encoding event in new account", "error", err) - } - - return buf.Bytes() -} diff --git a/www/nanomsg/event/event_test.go b/www/nanomsg/event/event_test.go deleted file mode 100644 index eec2bc1ee..000000000 --- a/www/nanomsg/event/event_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package event - -import ( - "testing" - - "github.com/pactus-project/pactus/crypto" - "github.com/pactus-project/pactus/crypto/hash" - "github.com/stretchr/testify/assert" -) - -func TestCreateBlockEvent(t *testing.T) { - h, _ := hash.FromString("000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f") - height := uint32(0x2134) - e := CreateBlockEvent(h, height) - assert.Equal(t, Event{ - 0x1, 0x1, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, - 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, - 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x34, 0x21, 0x0, 0x0, - }, e) -} - -func TestCreateNewTransactionEvent(t *testing.T) { - h, _ := hash.FromString("000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f") - height := uint32(0x2134) - e := CreateTransactionEvent(h, height) - assert.Equal(t, Event{ - 0x1, 0x2, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, - 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, - 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x34, 0x21, 0x0, 0x0, - }, e) -} - -func TestCreateAccountChangeEvent(t *testing.T) { - addr, _ := crypto.AddressFromString("pc1p0hrct7eflrpw4ccrttxzs4qud2axex4dcdzdfr") - height := uint32(0x2134) - e := CreateAccountChangeEvent(addr, height) - assert.Equal(t, Event{ - 0x01, 0x03, 0x1, 0x7d, 0xc7, 0x85, 0xfb, 0x29, 0xf8, 0xc2, 0xea, 0xe3, - 0x3, 0x5a, 0xcc, 0x28, 0x54, 0x1c, 0x6a, 0xba, 0x6c, 0x9a, 0xad, 0x34, 0x21, 0x0, 0x0, - }, e) -} diff --git a/www/nanomsg/server.go b/www/nanomsg/server.go deleted file mode 100644 index da3e82629..000000000 --- a/www/nanomsg/server.go +++ /dev/null @@ -1,98 +0,0 @@ -package nanomsg - -import ( - "bytes" - "context" - "net" - - "github.com/pactus-project/pactus/util/encoding" - "github.com/pactus-project/pactus/util/logger" - "github.com/pactus-project/pactus/www/nanomsg/event" - mangos "go.nanomsg.org/mangos/v3" - "go.nanomsg.org/mangos/v3/protocol/pub" - _ "go.nanomsg.org/mangos/v3/transport/all" // register nano ports transports. -) - -type Server struct { - ctx context.Context - cancel context.CancelFunc - config *Config - publisher mangos.Socket - listener net.Listener - logger *logger.SubLogger - eventCh <-chan event.Event - seqNum uint32 -} - -func NewServer(conf *Config, eventCh <-chan event.Event) *Server { - ctx, cancel := context.WithCancel(context.Background()) - - return &Server{ - ctx: ctx, - cancel: cancel, - config: conf, - logger: logger.NewSubLogger("_nonomsg", nil), - eventCh: eventCh, - seqNum: 0, - } -} - -func (s *Server) StartServer() error { - if !s.config.Enable { - return nil - } - publisher, err := pub.NewSocket() - if err != nil { - return err - } - listener, err := publisher.NewListener(s.config.Listen, nil) - if err != nil { - return err - } - err = listener.Listen() - if err != nil { - return err - } - - s.publisher = publisher - - s.logger.Info("nanomsg started listening", "address", listener.Address()) - - go s.eventLoop() - - return nil -} - -func (s *Server) StopServer() { - s.cancel() - s.logger.Debug("context closed", "reason", s.ctx.Err()) - - if s.listener != nil { - _ = s.listener.Close() - } -} - -func (s *Server) eventLoop() { - for { - select { - case <-s.ctx.Done(): - return - - case e := <-s.eventCh: - writer := bytes.NewBuffer(e) - err := encoding.WriteElement(writer, s.seqNum) - if err != nil { - s.logger.Error("error on encoding event", "error", err) - - return - } - err = s.publisher.Send(writer.Bytes()) - if err != nil { - s.logger.Error("error on emitting event", "error", err) - - return - } - s.seqNum++ - } - } -} From 1fd6e59190012877fd2f32d38d210739b7284801 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 11 Jan 2025 09:34:24 +0330 Subject: [PATCH 2/2] feat: add event channel zeromq to state --- consensus/consensus_test.go | 10 +++++----- node/node.go | 17 ++++++++++++++++- state/state.go | 15 ++++++++++++++- state/state_test.go | 4 ++-- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index fcd5bf1a4..0d8adae12 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -94,16 +94,16 @@ func setupWithSeed(t *testing.T, seed int64) *testData { Add(time.Duration(params.BlockIntervalInSecond) * time.Second) genDoc := genesis.MakeGenesis(getTime, accs, vals, params) stateX, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexX]}, - store.MockingStore(ts), txPool) + store.MockingStore(ts), txPool, nil) require.NoError(t, err) stateY, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexY]}, - store.MockingStore(ts), txPool) + store.MockingStore(ts), txPool, nil) require.NoError(t, err) stateB, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexB]}, - store.MockingStore(ts), txPool) + store.MockingStore(ts), txPool, nil) require.NoError(t, err) stateP, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexP]}, - store.MockingStore(ts), txPool) + store.MockingStore(ts), txPool, nil) require.NoError(t, err) consMessages := make([]consMessage, 0) @@ -451,7 +451,7 @@ func TestNotInCommittee(t *testing.T) { valKey := td.RandValKey() store := store.MockingStore(td.TestSuite) - state, _ := state.LoadOrNewState(td.genDoc, []*bls.ValidatorKey{valKey}, store, td.txPool) + state, _ := state.LoadOrNewState(td.genDoc, []*bls.ValidatorKey{valKey}, store, td.txPool, nil) consInt := NewConsensus(testConfig(), state, valKey, valKey.Address(), make(chan message.Message, 100), newConcreteMediator()) cons := consInt.(*consensus) diff --git a/node/node.go b/node/node.go index 91548fafb..50561d2bf 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,7 @@ package node import ( + "context" "time" "github.com/pactus-project/pactus/config" @@ -21,6 +22,7 @@ import ( "github.com/pactus-project/pactus/www/grpc" "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" + "github.com/pactus-project/pactus/www/zmq" "github.com/pkg/errors" ) @@ -36,6 +38,8 @@ type Node struct { http *http.Server grpc *grpc.Server jsonrpc *jsonrpc.Server + zeromq *zmq.Server + eventCh chan any } func NewNode(genDoc *genesis.Genesis, conf *config.Config, @@ -51,6 +55,7 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, "network", chainType) messageCh := make(chan message.Message, 500) + eventCh := make(chan any) store, err := store.NewStore(conf.Store) if err != nil { @@ -59,7 +64,7 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, txPool := txpool.NewTxPool(conf.TxPool, store, messageCh) - state, err := state.LoadOrNewState(genDoc, valKeys, store, txPool) + state, err := state.LoadOrNewState(genDoc, valKeys, store, txPool, eventCh) if err != nil { return nil, err } @@ -84,6 +89,12 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, if conf.GRPC.BasicAuth != "" { enableHTTPAuth = true } + + zeromqServer, err := zmq.New(context.TODO(), conf.ZeroMq, eventCh) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(conf.GRPC, state, syn, net, consMgr, walletMgr) httpServer := http.NewServer(conf.HTTP, enableHTTPAuth) jsonrpcServer := jsonrpc.NewServer(conf.JSONRPC) @@ -100,6 +111,8 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, http: httpServer, grpc: grpcServer, jsonrpc: jsonrpcServer, + zeromq: zeromqServer, + eventCh: eventCh, } return node, nil @@ -154,6 +167,7 @@ func (n *Node) Stop() { // Wait for network to stop time.Sleep(1 * time.Second) + close(n.eventCh) n.consMgr.Stop() n.sync.Stop() n.state.Close() @@ -161,6 +175,7 @@ func (n *Node) Stop() { n.grpc.StopServer() n.http.StopServer() n.jsonrpc.StopServer() + n.zeromq.Close() } // these methods are using by GUI. diff --git a/state/state.go b/state/state.go index e0503262e..b4050bff0 100644 --- a/state/state.go +++ b/state/state.go @@ -48,13 +48,14 @@ type state struct { validatorMerkle *persistentmerkle.Tree scoreMgr *score.Manager logger *logger.SubLogger + eventCh chan<- any } func LoadOrNewState( genDoc *genesis.Genesis, valKeys []*bls.ValidatorKey, store store.Store, - txPool txpool.TxPool, + txPool txpool.TxPool, eventCh chan<- any, ) (Facade, error) { state := &state{ valKeys: valKeys, @@ -65,6 +66,7 @@ func LoadOrNewState( lastInfo: lastinfo.NewLastInfo(), accountMerkle: persistentmerkle.New(), validatorMerkle: persistentmerkle.New(), + eventCh: eventCh, } state.logger = logger.NewSubLogger("_state", state) state.store = store @@ -456,6 +458,9 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat } } + // publish committed block to event channel zeromq + st.publishEvent(blk) + return nil } @@ -741,3 +746,11 @@ func (st *state) IsPruned() bool { func (st *state) PruningHeight() uint32 { return st.store.PruningHeight() } + +func (st *state) publishEvent(msg any) { + if st.eventCh == nil { + return + } + + st.eventCh <- msg +} diff --git a/state/state_test.go b/state/state_test.go index 6b6eb7b6d..0e701c838 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -74,7 +74,7 @@ func setup(t *testing.T) *testData { // First validator is in the committee valKeys := []*bls.ValidatorKey{genValKeys[0], ts.RandValKey()} - st1, err := LoadOrNewState(gnDoc, valKeys, mockStore, mockTxPool) + st1, err := LoadOrNewState(gnDoc, valKeys, mockStore, mockTxPool, nil) require.NoError(t, err) state, _ := st1.(*state) @@ -539,7 +539,7 @@ func TestLoadState(t *testing.T) { // Load last state info newState, err := LoadOrNewState(td.state.genDoc, td.state.valKeys, - td.state.store, td.commonTxPool) + td.state.store, td.commonTxPool, nil) require.NoError(t, err) assert.Equal(t, td.state.TotalAccounts(), newState.TotalAccounts())