diff --git a/.github/workflows/semantic-pr.yml b/.github/workflows/semantic-pr.yml index 6c65462ff..c9d8a4b23 100644 --- a/.github/workflows/semantic-pr.yml +++ b/.github/workflows/semantic-pr.yml @@ -59,7 +59,6 @@ jobs: proto http jsonrpc - nanomsg zeromq windows linux diff --git a/config/config.go b/config/config.go index e6bb64a73..fff0f672b 100644 --- a/config/config.go +++ b/config/config.go @@ -19,7 +19,6 @@ import ( "github.com/pactus-project/pactus/www/grpc" "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" - "github.com/pactus-project/pactus/www/nanomsg" "github.com/pactus-project/pactus/www/zmq" "github.com/pelletier/go-toml/v2" ) @@ -47,7 +46,6 @@ type Config struct { JSONRPC *jsonrpc.Config `toml:"jsonrpc"` HTTP *http.Config `toml:"http"` WalletManager *wallet.Config `toml:"-"` - Nanomsg *nanomsg.Config `toml:"nanomsg"` ZeroMq *zmq.Config `toml:"zeromq"` } @@ -100,7 +98,6 @@ func defaultConfig() *Config { GRPC: grpc.DefaultConfig(), JSONRPC: jsonrpc.DefaultConfig(), HTTP: http.DefaultConfig(), - Nanomsg: nanomsg.DefaultConfig(), ZeroMq: zmq.DefaultConfig(), WalletManager: wallet.DefaultConfig(), } @@ -156,8 +153,6 @@ func DefaultConfigMainnet() *Config { conf.HTTP.Enable = false conf.HTTP.Listen = "127.0.0.1:80" conf.HTTP.EnablePprof = false - conf.Nanomsg.Enable = false - conf.Nanomsg.Listen = "tcp://127.0.0.1:40899" return conf } @@ -192,8 +187,6 @@ func DefaultConfigTestnet() *Config { conf.HTTP.Enable = false conf.HTTP.Listen = "[::]:80" conf.HTTP.EnablePprof = false - conf.Nanomsg.Enable = false - conf.Nanomsg.Listen = "tcp://[::]:40799" return conf } @@ -220,8 +213,6 @@ func DefaultConfigLocalnet() *Config { conf.HTTP.Enable = true conf.HTTP.Listen = "[::]:0" conf.HTTP.EnablePprof = true - conf.Nanomsg.Enable = true - conf.Nanomsg.Listen = "tcp://[::]:40799" conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332" conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333" conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334" @@ -295,9 +286,6 @@ func (conf *Config) BasicCheck() error { if err := conf.Sync.BasicCheck(); err != nil { return err } - if err := conf.Nanomsg.BasicCheck(); err != nil { - return err - } if err := conf.JSONRPC.BasicCheck(); err != nil { return err } diff --git a/config/config_test.go b/config/config_test.go index 3315d3750..c1596974c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -48,12 +48,10 @@ func TestDefaultConfig(t *testing.T) { assert.False(t, conf.GRPC.Enable) assert.False(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Zero(t, conf.GRPC.Listen) assert.Zero(t, conf.GRPC.Gateway.Listen) assert.Zero(t, conf.HTTP.Listen) - assert.Zero(t, conf.Nanomsg.Listen) } func TestMainnetConfig(t *testing.T) { @@ -67,12 +65,10 @@ func TestMainnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.False(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Equal(t, "127.0.0.1:50051", conf.GRPC.Listen) assert.Equal(t, "127.0.0.1:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "127.0.0.1:80", conf.HTTP.Listen) - assert.Equal(t, "tcp://127.0.0.1:40899", conf.Nanomsg.Listen) } func TestTestnetConfig(t *testing.T) { @@ -86,12 +82,10 @@ func TestTestnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.True(t, conf.GRPC.Gateway.Enable) assert.False(t, conf.HTTP.Enable) - assert.False(t, conf.Nanomsg.Enable) assert.Equal(t, "[::]:50052", conf.GRPC.Listen) assert.Equal(t, "[::]:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "[::]:80", conf.HTTP.Listen) - assert.Equal(t, "tcp://[::]:40799", conf.Nanomsg.Listen) } func TestLocalnetConfig(t *testing.T) { @@ -105,12 +99,10 @@ func TestLocalnetConfig(t *testing.T) { assert.True(t, conf.GRPC.Enable) assert.True(t, conf.GRPC.Gateway.Enable) assert.True(t, conf.HTTP.Enable) - assert.True(t, conf.Nanomsg.Enable) assert.Equal(t, "[::]:50052", conf.GRPC.Listen) assert.Equal(t, "[::]:8080", conf.GRPC.Gateway.Listen) assert.Equal(t, "[::]:0", conf.HTTP.Listen) - assert.Equal(t, "tcp://[::]:40799", conf.Nanomsg.Listen) } func TestLoadFromFile(t *testing.T) { diff --git a/config/example_config.toml b/config/example_config.toml index f01b25dc6..f04cb5cb0 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -238,16 +238,6 @@ # data, so enable only in secure environments with restricted access. enable_pprof = false -# Nanomsg configuration. -[nanomsg] - - # `enable` indicates whether nanomsg service should be enabled or not. - # Default is `false`. - enable = false - - # `listen` is the address for incoming connections to the nanomsg server. - listen = 'tcp://127.0.0.1:40899' - # ZeroMQ configuration. [zeromq] diff --git a/go.mod b/go.mod index 7ef1a78e6..44d8610b3 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/stretchr/testify v1.10.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/tyler-smith/go-bip39 v1.1.0 - go.nanomsg.org/mangos/v3 v3.4.2 golang.org/x/crypto v0.31.0 golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 golang.org/x/term v0.27.0 @@ -47,7 +46,6 @@ require ( ) require ( - github.com/Microsoft/go-winio v0.6.2 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect diff --git a/go.sum b/go.sum index a63aa3964..7b3bb0260 100644 --- a/go.sum +++ b/go.sum @@ -8,9 +8,6 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1 dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= -github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= -github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/NathanBaulch/protoc-gen-cobra v1.2.1 h1:BOqX9glwicbqDJDGndMnhHhx8psGTSjGdZzRDY1a7A8= github.com/NathanBaulch/protoc-gen-cobra v1.2.1/go.mod h1:ZLPLEPQgV3jP3a7IEp+xxYPk8tF4lhY9ViV0hn6K3iA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= @@ -101,7 +98,6 @@ github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/gdamore/optopia v0.2.0/go.mod h1:YKYEwo5C1Pa617H7NlPcmQXl+vG6YnSSNB44n8dNL0Q= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= @@ -186,7 +182,6 @@ github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyE github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotk3/gotk3 v0.6.2 h1:sx/PjaKfKULJPTPq8p2kn2ZbcNFxpOJqi4VLzMbEOO8= @@ -573,8 +568,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.nanomsg.org/mangos/v3 v3.4.2 h1:gHlopxjWvJcVCcUilQIsRQk9jdj6/HB7wrTiUN8Ki7Q= -go.nanomsg.org/mangos/v3 v3.4.2/go.mod h1:8+hjBMQub6HvXmuGvIq6hf19uxGQIjCofmc62lbedLA= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= diff --git a/node/node.go b/node/node.go index 4c54525cc..50561d2bf 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,7 @@ package node import ( + "context" "time" "github.com/pactus-project/pactus/config" @@ -21,8 +22,7 @@ import ( "github.com/pactus-project/pactus/www/grpc" "github.com/pactus-project/pactus/www/http" "github.com/pactus-project/pactus/www/jsonrpc" - "github.com/pactus-project/pactus/www/nanomsg" - "github.com/pactus-project/pactus/www/nanomsg/event" + "github.com/pactus-project/pactus/www/zmq" "github.com/pkg/errors" ) @@ -38,7 +38,8 @@ type Node struct { http *http.Server grpc *grpc.Server jsonrpc *jsonrpc.Server - nanomsg *nanomsg.Server + zeromq *zmq.Server + eventCh chan any } func NewNode(genDoc *genesis.Genesis, conf *config.Config, @@ -54,10 +55,7 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, "network", chainType) messageCh := make(chan message.Message, 500) - eventCh := make(chan event.Event, 500) - if !conf.Nanomsg.Enable { - eventCh = nil - } + eventCh := make(chan any) store, err := store.NewStore(conf.Store) if err != nil { @@ -91,10 +89,15 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, if conf.GRPC.BasicAuth != "" { enableHTTPAuth = true } + + zeromqServer, err := zmq.New(context.TODO(), conf.ZeroMq, eventCh) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(conf.GRPC, state, syn, net, consMgr, walletMgr) httpServer := http.NewServer(conf.HTTP, enableHTTPAuth) jsonrpcServer := jsonrpc.NewServer(conf.JSONRPC) - nanomsgServer := nanomsg.NewServer(conf.Nanomsg, eventCh) node := &Node{ config: conf, @@ -108,7 +111,8 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, http: httpServer, grpc: grpcServer, jsonrpc: jsonrpcServer, - nanomsg: nanomsgServer, + zeromq: zeromqServer, + eventCh: eventCh, } return node, nil @@ -152,11 +156,6 @@ func (n *Node) Start() error { return errors.Wrap(err, "could not start JSON-RPC server") } - err = n.nanomsg.StartServer() - if err != nil { - return errors.Wrap(err, "could not start Nanomsg server") - } - return nil } @@ -168,6 +167,7 @@ func (n *Node) Stop() { // Wait for network to stop time.Sleep(1 * time.Second) + close(n.eventCh) n.consMgr.Stop() n.sync.Stop() n.state.Close() @@ -175,7 +175,7 @@ func (n *Node) Stop() { n.grpc.StopServer() n.http.StopServer() n.jsonrpc.StopServer() - n.nanomsg.StopServer() + n.zeromq.Close() } // these methods are using by GUI. diff --git a/node/node_test.go b/node/node_test.go index 40343064a..6dde27471 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -39,8 +39,6 @@ func TestRunningNode(t *testing.T) { conf.HTTP.Listen = "0.0.0.0:0" conf.JSONRPC.Enable = true conf.JSONRPC.Listen = "0.0.0.0:0" - conf.Nanomsg.Enable = true - conf.Nanomsg.Listen = "tcp://0.0.0.0:0" conf.Store.Path = util.TempDirPath() conf.Network.EnableRelay = false conf.Network.NetworkKey = util.TempFilePath() diff --git a/state/state.go b/state/state.go index 28aa678a6..b4050bff0 100644 --- a/state/state.go +++ b/state/state.go @@ -31,7 +31,6 @@ import ( "github.com/pactus-project/pactus/util/logger" "github.com/pactus-project/pactus/util/persistentmerkle" "github.com/pactus-project/pactus/util/simplemerkle" - "github.com/pactus-project/pactus/www/nanomsg/event" ) type state struct { @@ -49,14 +48,14 @@ type state struct { validatorMerkle *persistentmerkle.Tree scoreMgr *score.Manager logger *logger.SubLogger - eventCh chan event.Event + eventCh chan<- any } func LoadOrNewState( genDoc *genesis.Genesis, valKeys []*bls.ValidatorKey, store store.Store, - txPool txpool.TxPool, eventCh chan event.Event, + txPool txpool.TxPool, eventCh chan<- any, ) (Facade, error) { state := &state{ valKeys: valKeys, @@ -459,9 +458,8 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat } } - // ----------------------------------- - // Publishing the events to the nano message. - st.publishEvents(height, blk) + // publish committed block to event channel zeromq + st.publishEvent(blk) return nil } @@ -716,30 +714,6 @@ func (st *state) Params() *param.Params { return st.params } -// publishEvents publishes block related events. -func (st *state) publishEvents(height uint32, blk *block.Block) { - if st.eventCh == nil { - return - } - blockEvent := event.CreateBlockEvent(blk.Hash(), height) - st.eventCh <- blockEvent - - for i := 1; i < blk.Transactions().Len(); i++ { - transaction := blk.Transactions().Get(i) - - senderChangeEvent := event.CreateAccountChangeEvent(transaction.Payload().Signer(), height) - st.eventCh <- senderChangeEvent - - if transaction.Payload().Receiver() != nil { - receiverChangeEvent := event.CreateAccountChangeEvent(*transaction.Payload().Receiver(), height) - st.eventCh <- receiverChangeEvent - } - - txEvent := event.CreateTransactionEvent(transaction.ID(), height) - st.eventCh <- txEvent - } -} - func (st *state) CalculateFee(amt amount.Amount, payloadType payload.Type) amount.Amount { return st.txPool.EstimatedFee(amt, payloadType) } @@ -772,3 +746,11 @@ func (st *state) IsPruned() bool { func (st *state) PruningHeight() uint32 { return st.store.PruningHeight() } + +func (st *state) publishEvent(msg any) { + if st.eventCh == nil { + return + } + + st.eventCh <- msg +} diff --git a/www/nanomsg/config.go b/www/nanomsg/config.go deleted file mode 100644 index 50c316844..000000000 --- a/www/nanomsg/config.go +++ /dev/null @@ -1,18 +0,0 @@ -package nanomsg - -type Config struct { - Enable bool `toml:"enable"` - Listen string `toml:"listen"` -} - -func DefaultConfig() *Config { - return &Config{ - Enable: false, - Listen: "", - } -} - -// BasicCheck performs basic checks on the configuration. -func (*Config) BasicCheck() error { - return nil -} diff --git a/www/nanomsg/event/event.go b/www/nanomsg/event/event.go deleted file mode 100644 index 45ee601cb..000000000 --- a/www/nanomsg/event/event.go +++ /dev/null @@ -1,58 +0,0 @@ -package event - -import ( - "bytes" - - "github.com/pactus-project/pactus/crypto" - "github.com/pactus-project/pactus/crypto/hash" - "github.com/pactus-project/pactus/types/tx" - "github.com/pactus-project/pactus/util/encoding" - "github.com/pactus-project/pactus/util/logger" -) - -const ( - TopicBlock = uint16(0x0101) - TopicTransaction = uint16(0x0201) - TopicAccountChange = uint16(0x0301) -) - -type Event []byte - -// CreateBlockEvent creates an event when the new block is committed. -// The block event structure is like : -// . -func CreateBlockEvent(blockHash hash.Hash, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicBlock, blockHash, height) - if err != nil { - logger.Error("error on encoding event in new block", "error", err) - } - - return buf.Bytes() -} - -// CreateTransactionEvent creates an event when a new transaction sent. -// The new transaction event structure is like : -// . -func CreateTransactionEvent(txHash tx.ID, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicTransaction, txHash, height) - if err != nil { - logger.Error("error on encoding event in new transaction", "error", err) - } - - return buf.Bytes() -} - -// CreateAccountChangeEvent creates an event when the new account is created. -// The account event structure is like : -// . -func CreateAccountChangeEvent(accountAddr crypto.Address, height uint32) Event { - buf := bytes.NewBuffer(make([]byte, 0, 42)) - err := encoding.WriteElements(buf, TopicAccountChange, accountAddr, height) - if err != nil { - logger.Error("error on encoding event in new account", "error", err) - } - - return buf.Bytes() -} diff --git a/www/nanomsg/event/event_test.go b/www/nanomsg/event/event_test.go deleted file mode 100644 index eec2bc1ee..000000000 --- a/www/nanomsg/event/event_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package event - -import ( - "testing" - - "github.com/pactus-project/pactus/crypto" - "github.com/pactus-project/pactus/crypto/hash" - "github.com/stretchr/testify/assert" -) - -func TestCreateBlockEvent(t *testing.T) { - h, _ := hash.FromString("000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f") - height := uint32(0x2134) - e := CreateBlockEvent(h, height) - assert.Equal(t, Event{ - 0x1, 0x1, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, - 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, - 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x34, 0x21, 0x0, 0x0, - }, e) -} - -func TestCreateNewTransactionEvent(t *testing.T) { - h, _ := hash.FromString("000102030405060708090a0b0c0d0e0f000102030405060708090a0b0c0d0e0f") - height := uint32(0x2134) - e := CreateTransactionEvent(h, height) - assert.Equal(t, Event{ - 0x1, 0x2, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, - 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, - 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x34, 0x21, 0x0, 0x0, - }, e) -} - -func TestCreateAccountChangeEvent(t *testing.T) { - addr, _ := crypto.AddressFromString("pc1p0hrct7eflrpw4ccrttxzs4qud2axex4dcdzdfr") - height := uint32(0x2134) - e := CreateAccountChangeEvent(addr, height) - assert.Equal(t, Event{ - 0x01, 0x03, 0x1, 0x7d, 0xc7, 0x85, 0xfb, 0x29, 0xf8, 0xc2, 0xea, 0xe3, - 0x3, 0x5a, 0xcc, 0x28, 0x54, 0x1c, 0x6a, 0xba, 0x6c, 0x9a, 0xad, 0x34, 0x21, 0x0, 0x0, - }, e) -} diff --git a/www/nanomsg/server.go b/www/nanomsg/server.go deleted file mode 100644 index da3e82629..000000000 --- a/www/nanomsg/server.go +++ /dev/null @@ -1,98 +0,0 @@ -package nanomsg - -import ( - "bytes" - "context" - "net" - - "github.com/pactus-project/pactus/util/encoding" - "github.com/pactus-project/pactus/util/logger" - "github.com/pactus-project/pactus/www/nanomsg/event" - mangos "go.nanomsg.org/mangos/v3" - "go.nanomsg.org/mangos/v3/protocol/pub" - _ "go.nanomsg.org/mangos/v3/transport/all" // register nano ports transports. -) - -type Server struct { - ctx context.Context - cancel context.CancelFunc - config *Config - publisher mangos.Socket - listener net.Listener - logger *logger.SubLogger - eventCh <-chan event.Event - seqNum uint32 -} - -func NewServer(conf *Config, eventCh <-chan event.Event) *Server { - ctx, cancel := context.WithCancel(context.Background()) - - return &Server{ - ctx: ctx, - cancel: cancel, - config: conf, - logger: logger.NewSubLogger("_nonomsg", nil), - eventCh: eventCh, - seqNum: 0, - } -} - -func (s *Server) StartServer() error { - if !s.config.Enable { - return nil - } - publisher, err := pub.NewSocket() - if err != nil { - return err - } - listener, err := publisher.NewListener(s.config.Listen, nil) - if err != nil { - return err - } - err = listener.Listen() - if err != nil { - return err - } - - s.publisher = publisher - - s.logger.Info("nanomsg started listening", "address", listener.Address()) - - go s.eventLoop() - - return nil -} - -func (s *Server) StopServer() { - s.cancel() - s.logger.Debug("context closed", "reason", s.ctx.Err()) - - if s.listener != nil { - _ = s.listener.Close() - } -} - -func (s *Server) eventLoop() { - for { - select { - case <-s.ctx.Done(): - return - - case e := <-s.eventCh: - writer := bytes.NewBuffer(e) - err := encoding.WriteElement(writer, s.seqNum) - if err != nil { - s.logger.Error("error on encoding event", "error", err) - - return - } - err = s.publisher.Send(writer.Bytes()) - if err != nil { - s.logger.Error("error on emitting event", "error", err) - - return - } - s.seqNum++ - } - } -}