Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge unstable into master #1003

Merged
merged 46 commits into from
Dec 18, 2023
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c76d1e1
Remove nim 1.2 support (#907)
Menduist Jun 7, 2023
32085ca
Allow to override TCP connection timeouts (#903)
Menduist Jun 7, 2023
3640b4d
Autonat and HP changes (#899)
diegomrsantos Jun 7, 2023
3725f6a
chore: add basic metrics for rendezvous (#905)
vpavlin Jun 7, 2023
eb78292
Bump deps (#896)
Menduist Jun 7, 2023
c28d8bb
WS Transport: handle 'tls/ws' (#914)
Menduist Jun 12, 2023
6771147
Consider dns as public address (#913)
diegomrsantos Jun 13, 2023
a65b7b0
GossipSub: remove peer if we can't establish sendConn (#886)
Menduist Jun 14, 2023
49dfa84
Transports: handle TransportAbortedError properly (#916)
Menduist Jun 14, 2023
9d4c430
Bumper: fix case where target is up to date (#917)
Menduist Jun 15, 2023
5efa089
TCP transport: handle getObservedAddr errors (#918)
Menduist Jun 20, 2023
224f92e
Fix #916 regression causing accept loop lockup (#919)
Menduist Jun 20, 2023
1c4d083
Add GossipSub ping (#912)
Menduist Jun 21, 2023
66f9dc9
Remove all `Result.get()`s & `Option` -> `Opt` (#902)
Menduist Jun 28, 2023
f85d0f7
Handling Opt[PeerId] in logging (#923)
diegomrsantos Jun 28, 2023
43359dd
standard nimble env vars (#921)
arnetheduck Jun 28, 2023
81b861b
avoid `ProveField` warning in `crypto.init` (#915)
etan-status Jun 29, 2023
c45f970
Gossipsub scoring improvements (#909)
diegomrsantos Jul 3, 2023
74c402e
wstransport.nim: avoid re-raising 'TransportOsError' to avoid stoppin…
Ivansete-status Jul 7, 2023
1721f07
Fix crash on empty write (#930)
Menduist Jul 10, 2023
fab1340
avoid a few zeroMem (#932)
arnetheduck Jul 11, 2023
440461b
GS: improve handleIHave (#922)
Menduist Jul 11, 2023
b784167
GossipSub: IDontWant (#934)
Menduist Jul 28, 2023
7a369dd
GossipSub: Limit flood publishing (#911)
Menduist Jul 31, 2023
56c23a2
Add specs crypto tests (#938)
Menduist Aug 1, 2023
d6263bf
nim-websock new version compatibility (#939)
Menduist Aug 2, 2023
f80ce31
Bandwidth estimate as a parameter (#941)
diegomrsantos Aug 14, 2023
e03547e
Perf protocol (#925)
lchenut Aug 14, 2023
c6aa085
Prevent concurrent IWANT of the same message (#943)
diegomrsantos Aug 21, 2023
b76bac7
avoid importing `ecnist` when not needed (#942)
arnetheduck Aug 30, 2023
545a31d
Bump dependencies (#947)
Menduist Sep 6, 2023
ff77d52
IDontWant metrics (#946)
Menduist Sep 6, 2023
20b0e40
Fix doc generation CI (#948)
Menduist Sep 8, 2023
b2eac7e
GS: Relay messages to direct peers (#949)
Menduist Sep 15, 2023
56599f5
GossipSub Traffic scoring (#920)
diegomrsantos Sep 22, 2023
61929ae
Improve rdv advertise (#951)
diegomrsantos Sep 27, 2023
7587181
Split msgs in iwant response if bigger than limit (#944)
diegomrsantos Oct 2, 2023
575344e
Update interop CI name (#956)
Menduist Oct 5, 2023
459f685
Add a flag if a peer should be disconnected when above rate limit (#954)
diegomrsantos Oct 5, 2023
18b0f72
Rate Limit tests (#953)
diegomrsantos Oct 5, 2023
60f9536
Remove ConnManager from Upgrade (#959)
Menduist Oct 13, 2023
fc4e9a8
Fix WS transport when the connection aborts (#967)
lchenut Oct 23, 2023
3fc1236
Revert "Prevent concurrent IWANT of the same message (#943)" (#977)
diegomrsantos Nov 3, 2023
ebef85c
Rate limit fixes (#965)
diegomrsantos Nov 9, 2023
5f7a3ab
fix: doc workflow (#985)
diegomrsantos Nov 16, 2023
4e7eaba
fix chronos v4 compat (#982)
arnetheduck Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split msgs in iwant response if bigger than limit (#944)
diegomrsantos authored Oct 2, 2023
commit 75871817ee39e23bb0f34f6435feb19d21db00d0
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec
g.codecs &= GossipSubCodec_10

method onNewPeer(g: GossipSub, peer: PubSubPeer) =
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
# Make sure stats and peer information match, even when reloading peer stats
# from a previous connection
47 changes: 43 additions & 4 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
@@ -242,7 +242,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =
return

if msg.len > p.maxMessageSize:
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

if p.sendConn == nil:
@@ -269,9 +269,42 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =

await conn.close() # This will clean up the send connection

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
## exceeds the `maxSize` when trying to fit into an empty `RPCMsg`, the latter is skipped as too large to send.
## Every constructed `RPCMsg` is then encoded, optionally anonymized, and yielded as a sequence of bytes.

var currentRPCMsg = rpcMsg
currentRPCMsg.messages = newSeq[Message]()

var currentSize = byteSize(currentRPCMsg)

for msg in rpcMsg.messages:
let msgSize = byteSize(msg)

# Check if adding the next message will exceed maxSize
if float(currentSize + msgSize) * 1.1 > float(maxSize): # Guessing 10% protobuf overhead
if currentRPCMsg.messages.len == 0:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
continue # Skip this message

trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
yield encodeRpcMsg(currentRPCMsg, anonymize)
currentRPCMsg = RPCMsg()
currentSize = 0

currentRPCMsg.messages.add(msg)
currentSize += msgSize

# Check if there is a non-empty currentRPCMsg left to be added
if currentSize > 0 and currentRPCMsg.messages.len > 0:
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
yield encodeRpcMsg(currentRPCMsg, anonymize)
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
@@ -289,7 +322,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
sendMetrics(msg)
encodeRpcMsg(msg, anonymize)

asyncSpawn p.sendEncoded(encoded)
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
82 changes: 56 additions & 26 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@

{.push raises: [].}

import options, sequtils
import options, sequtils, sugar
import "../../.."/[
peerid,
routing_record,
@@ -18,6 +18,14 @@ import "../../.."/[

export options

proc expectedFields[T](t: typedesc[T], existingFieldNames: seq[string]) {.raises: [CatchableError].} =
var fieldNames: seq[string]
for name, _ in fieldPairs(T()):
fieldNames &= name
if fieldNames != existingFieldNames:
fieldNames.keepIf(proc(it: string): bool = it notin existingFieldNames)
raise newException(CatchableError, $T & " fields changed, please search for and revise all relevant procs. New fields: " & $fieldNames)

type
PeerInfoMsg* = object
peerId*: PeerId
@@ -117,31 +125,53 @@ func shortLog*(m: RPCMsg): auto =
control: m.control.get(ControlMessage()).shortLog
)

static: expectedFields(PeerInfoMsg, @["peerId", "signedPeerRecord"])
proc byteSize(peerInfo: PeerInfoMsg): int =
peerInfo.peerId.len + peerInfo.signedPeerRecord.len

static: expectedFields(SubOpts, @["subscribe", "topic"])
proc byteSize(subOpts: SubOpts): int =
1 + subOpts.topic.len # 1 byte for the bool

static: expectedFields(Message, @["fromPeer", "data", "seqno", "topicIds", "signature", "key"])
proc byteSize*(msg: Message): int =
var total = 0
total += msg.fromPeer.len
total += msg.data.len
total += msg.seqno.len
total += msg.signature.len
total += msg.key.len
for topicId in msg.topicIds:
total += topicId.len
return total
msg.fromPeer.len + msg.data.len + msg.seqno.len +
msg.signature.len + msg.key.len + msg.topicIds.foldl(a + b.len, 0)

proc byteSize*(msgs: seq[Message]): int =
msgs.mapIt(byteSize(it)).foldl(a + b, 0)

proc byteSize*(ihave: seq[ControlIHave]): int =
var total = 0
for item in ihave:
total += item.topicId.len
for msgId in item.messageIds:
total += msgId.len
return total

proc byteSize*(iwant: seq[ControlIWant]): int =
var total = 0
for item in iwant:
for msgId in item.messageIds:
total += msgId.len
return total
msgs.foldl(a + b.byteSize, 0)

static: expectedFields(ControlIHave, @["topicId", "messageIds"])
proc byteSize(controlIHave: ControlIHave): int =
controlIHave.topicId.len + controlIHave.messageIds.foldl(a + b.len, 0)

proc byteSize*(ihaves: seq[ControlIHave]): int =
ihaves.foldl(a + b.byteSize, 0)

static: expectedFields(ControlIWant, @["messageIds"])
proc byteSize(controlIWant: ControlIWant): int =
controlIWant.messageIds.foldl(a + b.len, 0)

proc byteSize*(iwants: seq[ControlIWant]): int =
iwants.foldl(a + b.byteSize, 0)

static: expectedFields(ControlGraft, @["topicId"])
proc byteSize(controlGraft: ControlGraft): int =
controlGraft.topicId.len

static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"])
proc byteSize(controlPrune: ControlPrune): int =
controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64

static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)

static: expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
proc byteSize*(rpc: RPCMsg): int =
result = rpc.subscriptions.foldl(a + b.byteSize, 0) + byteSize(rpc.messages) +
rpc.ping.len + rpc.pong.len
rpc.control.withValue(ctrl):
result += ctrl.byteSize
162 changes: 139 additions & 23 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
@@ -1,42 +1,31 @@
include ../../libp2p/protocols/pubsub/gossipsub
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

{.used.}

import std/[options, deques]
import std/[options, deques, sequtils, enumerate, algorithm]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message, messages]
import ../../libp2p/switch
import ../../libp2p/muxers/muxer
import ../../libp2p/protocols/pubsub/rpc/protobuf
import utils

import ../helpers

type
TestGossipSub = ref object of GossipSub

proc noop(data: seq[byte]) {.async, gcsafe.} = discard

proc getPubSubPeer(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] =
p.switch.dial(peerId, GossipSubCodec)

let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024, Opt.some(TokenBucket.new(1024, 500.milliseconds)))
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer

onNewPeer(p, pubSubPeer)
pubSubPeer

proc randomPeerId(): PeerId =
try:
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
except CatchableError as exc:
raise newException(Defect, exc.msg)

const MsgIdSuccess = "msg id gen success"

suite "GossipSub internal":
@@ -826,3 +815,130 @@ suite "GossipSub internal":

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let
nodes = generateNodes(2, gossip = true, verifySignature = false)
discard await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start()
)

await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)

var receivedMessages = new(HashSet[seq[byte]])

proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} =
receivedMessages[].incl(data)

proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip0: GossipSub = GossipSub(nodes[0])
var gossip1: GossipSub = GossipSub(nodes[1])

return (gossip0, gossip1, receivedMessages)

proc teardownTest(gossip0: GossipSub, gossip1: GossipSub) {.async.} =
await allFuturesThrowing(
gossip0.switch.stop(),
gossip1.switch.stop()
)

proc createMessages(gossip0: GossipSub, gossip1: GossipSub, size1: int, size2: int): tuple[iwantMessageIds: seq[MessageId], sentMessages: HashSet[seq[byte]]] =
var iwantMessageIds = newSeq[MessageId]()
var sentMessages = initHashSet[seq[byte]]()

for i, size in enumerate([size1, size2]):
let data = newSeqWith[byte](size, i.byte)
sentMessages.incl(data)

let msg = Message.init(gossip1.peerInfo.peerId, data, "foobar", some(uint64(i + 1)))
let iwantMessageId = gossip1.msgIdProvider(msg).expect(MsgIdSuccess)
iwantMessageIds.add(iwantMessageId)
gossip1.mcache.put(iwantMessageId, msg)

let peer = gossip1.peers[(gossip0.peerInfo.peerId)]
peer.sentIHaves[^1].incl(iwantMessageId)

return (iwantMessageIds, sentMessages)

asyncTest "e2e - Split IWANT replies when individual messages are below maxSize but combined exceed maxSize":
# This test checks if two messages, each below the maxSize, are correctly split when their combined size exceeds maxSize.
# Expected: Both messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()

let messageSize = gossip1.maxMessageSize div 2 + 1
let (iwantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
))))

checkExpiring: receivedMessages[] == sentMessages
check receivedMessages[].len == 2

await teardownTest(gossip0, gossip1)

asyncTest "e2e - Discard IWANT replies when both messages individually exceed maxSize":
# This test checks if two messages, each exceeding the maxSize, are discarded and not sent.
# Expected: No messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()

let messageSize = gossip1.maxMessageSize + 10
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))

await sleepAsync(300.milliseconds)
checkExpiring: receivedMessages[].len == 0

await teardownTest(gossip0, gossip1)

asyncTest "e2e - Process IWANT replies when both messages are below maxSize":
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
# Expected: Both messages should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
let size1 = gossip1.maxMessageSize div 2
let size2 = gossip1.maxMessageSize div 3
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))

checkExpiring: receivedMessages[] == sentMessages
check receivedMessages[].len == 2

await teardownTest(gossip0, gossip1)

asyncTest "e2e - Split IWANT replies when one message is below maxSize and the other exceeds maxSize":
# This test checks if, when given two messages where one is below maxSize and the other exceeds it, only the smaller message is processed and sent.
# Expected: Only the smaller message should be received.
let (gossip0, gossip1, receivedMessages) = await setupTest()
let maxSize = gossip1.maxMessageSize
let size1 = maxSize div 2
let size2 = maxSize + 10
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))

var smallestSet: HashSet[seq[byte]]
let seqs = toSeq(sentMessages)
if seqs[0] < seqs[1]:
smallestSet.incl(seqs[0])
else:
smallestSet.incl(seqs[1])

checkExpiring: receivedMessages[] == smallestSet
check receivedMessages[].len == 1

await teardownTest(gossip0, gossip1)
Loading