Skip to content

Commit

Permalink
network: handle empty wsPeer supplied to transaction handler (algoran…
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Dec 20, 2024
1 parent 3920b49 commit 005495b
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 22 deletions.
5 changes: 5 additions & 0 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type facadePeer struct {
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }
func (p *facadePeer) RoutingAddr() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(p.id))
return buf
}

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
Expand Down
4 changes: 4 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (p *testUnicastPeer) GetAddress() string {

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
6 changes: 3 additions & 3 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectablePeer) (*util.ErlCapacityGuard, bool) {
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var isCMEnabled bool
var err error
Expand Down Expand Up @@ -715,11 +715,11 @@ func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []tran
}

// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool {
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectableAddressablePeer) bool {
// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return true
}
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectableAddressablePeer)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
13 changes: 12 additions & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type DisconnectablePeer interface {
GetNetwork() GossipNode
}

// DisconnectableAddressablePeer is a Peer with a long-living connection to a network that can be disconnected and has an IP address
type DisconnectableAddressablePeer interface {
DisconnectablePeer
IPAddressable
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// PeerOption allows users to specify a subset of peers to query
//
//msgp:ignore PeerOption
Expand Down Expand Up @@ -118,7 +129,7 @@ var outgoingMessagesBufferSize = int(

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender DisconnectablePeer
Sender DisconnectableAddressablePeer
Tag Tag
Data []byte
Err error
Expand Down
30 changes: 21 additions & 9 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,23 +943,35 @@ func (n *P2PNetwork) txTopicHandleLoop() {
}
}

type gsPeer struct {
peerID peer.ID
net *P2PNetwork
}

func (p *gsPeer) GetNetwork() GossipNode {
return p.net
}

func (p *gsPeer) RoutingAddr() []byte {
return []byte(p.peerID)
}

// txTopicValidator calls txHandler to validate and process incoming transactions.
func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var routingAddr [8]byte
n.wsPeersLock.Lock()
var wsp *wsPeer
var ok bool
if wsp, ok = n.wsPeers[peerID]; ok {
copy(routingAddr[:], wsp.RoutingAddr())
var sender DisconnectableAddressablePeer
if wsp, ok := n.wsPeers[peerID]; ok {
sender = wsp
} else {
// well, otherwise use last 8 bytes of peerID
copy(routingAddr[:], peerID[len(peerID)-8:])
// otherwise use the peerID to handle the case where this peer is not in the wsPeers map yet
// this can happen when pubsub receives new peer notifications before the wsStreamHandler is called:
// create a fake peer that is good enough for tx handler to work with.
sender = &gsPeer{peerID: peerID, net: n}
}
n.wsPeersLock.Unlock()

inmsg := IncomingMessage{
// Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr},
Sender: wsp,
Sender: sender,
Tag: protocol.TxnTag,
Data: msg.Data,
Net: n,
Expand Down
31 changes: 31 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -1374,3 +1375,33 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) {
require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

// TestP2PTxTopicValidator_NoWsPeer checks txTopicValidator does not call tx handler with empty Sender
func TestP2PTxTopicValidator_NoWsPeer(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses

net, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)

peerID := peer.ID("12345678") // must be 8+ in size
msg := pubsub.Message{Message: &pb.Message{}, ID: string(peerID)}
validateIncomingTxMessage := func(rawmsg IncomingMessage) OutgoingMessage {
require.NotEmpty(t, rawmsg.Sender)
require.Implements(t, (*DisconnectableAddressablePeer)(nil), rawmsg.Sender)
return OutgoingMessage{Action: Accept}
}
net.handler.RegisterValidatorHandlers([]TaggedMessageValidatorHandler{
{Tag: protocol.TxnTag, MessageHandler: ValidateHandleFunc(validateIncomingTxMessage)},
})

ctx := context.Background()
require.NotContains(t, net.wsPeers, peerID)
res := net.txTopicValidator(ctx, peerID, &msg)
require.Equal(t, pubsub.ValidationAccept, res)
}
6 changes: 3 additions & 3 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ const (
type broadcastRequest struct {
tags []Tag
data [][]byte
except *wsPeer
except Peer
done chan struct{}
enqueueTime time.Time
ctx context.Context
Expand Down Expand Up @@ -381,7 +381,7 @@ func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Ta

request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
if except != nil {
request.except = except.(*wsPeer)
request.except = except
}

broadcastQueue := wn.broadcastQueueBulk
Expand Down Expand Up @@ -1401,7 +1401,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit {
break
}
if peer == request.except {
if Peer(peer) == request.except {
continue
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
Expand Down
37 changes: 37 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4639,3 +4639,40 @@ func TestWebsocketNetworkHTTPClient(t *testing.T) {
_, err = netB.GetHTTPClient("invalid")
require.Error(t, err)
}

// TestPeerComparisonInBroadcast tests that the peer comparison in the broadcast function works as expected
// when casting wsPeer to Peer (interface{}) type.
func TestPeerComparisonInBroadcast(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

log := logging.TestingLog(t)
conf := config.GetDefaultLocal()
wn := &WebsocketNetwork{
log: log,
config: conf,
ctx: context.Background(),
}
wn.setup()

testPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "test-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
}
exceptPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "except-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
}

request := broadcastRequest{
tags: []protocol.Tag{"test-tag"},
data: [][]byte{[]byte("test-data")},
enqueueTime: time.Now(),
except: exceptPeer,
}

wn.broadcaster.innerBroadcast(request, false, []*wsPeer{testPeer, exceptPeer})

require.Equal(t, 1, len(testPeer.sendBufferBulk))
require.Equal(t, 0, len(exceptPeer.sendBufferBulk))
}
12 changes: 7 additions & 5 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ type wsPeer struct {

// closers is a slice of functions to run when the peer is closed
closers []func()
// closersMu synchronizes access to closers
closersMu deadlock.RWMutex

// peerType defines the peer's underlying connection type
// used for separate p2p vs ws metrics
Expand All @@ -295,11 +297,6 @@ type HTTPPeer interface {
GetHTTPClient() *http.Client
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
Expand Down Expand Up @@ -979,6 +976,8 @@ L:
}

}
wp.closersMu.RLock()
defer wp.closersMu.RUnlock()
// now call all registered closers
for _, f := range wp.closers {
f()
Expand Down Expand Up @@ -1115,6 +1114,9 @@ func (wp *wsPeer) sendMessagesOfInterest(messagesOfInterestGeneration uint32, me
}

func (wp *wsPeer) OnClose(f func()) {
wp.closersMu.Lock()
defer wp.closersMu.Unlock()

if wp.closers == nil {
wp.closers = []func(){}
}
Expand Down
4 changes: 4 additions & 0 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (mup *mockUnicastPeer) GetNetwork() network.GossipNode {
panic("not implemented")
}

func (mup *mockUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

// TestHandleCatchupReqNegative covers the error reporting in handleCatchupReq
func TestHandleCatchupReqNegative(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down

0 comments on commit 005495b

Please sign in to comment.