Skip to content

Commit

Permalink
feat: idontwant on publish
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Jan 10, 2025
1 parent 1fa30f0 commit 4a64c40
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 21 deletions.
62 changes: 41 additions & 21 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ proc init*(
overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit = false,
maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish = false,
): GossipSubParams =
GossipSubParams(
explicit: true,
Expand Down Expand Up @@ -139,6 +140,7 @@ proc init*(
overheadRateLimit: overheadRateLimit,
disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit,
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish: sendIDontWantOnPublish,
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -383,6 +385,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)

proc sendIDontWant(
g: GossipSub,
msg: Message,
msgId: MessageId,
peersToSendIDontWant: HashSet[PubSubPeer],
) =
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).

# IDONTWANT is only supported by >= GossipSubCodec_12
let peers = peersToSendIDontWant.filterIt(
it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11
)

g.broadcast(
peers,
RPCMsg(
control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)

const iDontWantMessageSizeThreshold* = 512

proc isLargeMessage(msg: Message, msgId: MessageId): bool =
msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10)

proc validateAndRelay(
g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer
) {.async.} =
Expand All @@ -399,29 +435,10 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
toSendPeers.excl(peer)

if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)

let validation = await g.validate(msg)

Expand Down Expand Up @@ -782,6 +799,9 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy

g.mcache.put(msgId, msg)

if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId):
g.sendIDontWant(msg, msgId, peers)

g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

if g.knownTopics.contains(topic):
Expand Down
3 changes: 3 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type
# Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected.
maxNumElementsInNonPriorityQueue*: int

# Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold
sendIDontWantOnPublish*: bool

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]

Expand Down
8 changes: 8 additions & 0 deletions libp2p/utility.nim
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if condition:
toExcl.incl(it)
set.excl(toExcl)

template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] =
var filtered = HashSet[T]()
if set.len != 0:
for it {.inject.} in set:
if condition:
filtered.incl(it)
filtered
33 changes: 33 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,39 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(2, gossip = true, msgIdProvider = dumbMsgIdProvider)

nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())

await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)

proc handlerA(topic: string, data: seq[byte]) {.async.} =
discard

proc handlerB(topic: string, data: seq[byte]) {.async.} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip2: GossipSub = GossipSub(nodes[1])

tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1

checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)

await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
Expand Down

0 comments on commit 4a64c40

Please sign in to comment.