-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
261 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Nim-LibP2P | ||
# Copyright (c) 2023 Status Research & Development GmbH | ||
# Licensed under either of | ||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) | ||
# * MIT license ([LICENSE-MIT](LICENSE-MIT)) | ||
# at your option. | ||
# This file may not be copied, modified, or distributed except according to | ||
# those terms. | ||
|
||
when (NimMajor, NimMinor) < (1, 4): | ||
{.push raises: [Defect].} | ||
else: | ||
{.push raises: [].} | ||
|
||
import chronos, chronicles, times, tables, sequtils, options | ||
import ../switch, | ||
../protocols/connectivity/relay/[client, utils] | ||
|
||
logScope: | ||
topics = "libp2p autorelay" | ||
|
||
type | ||
OnReservationHandler = proc (addresses: seq[MultiAddress]) {.gcsafe, raises: [Defect].} | ||
|
||
AutoRelayService* = ref object of Service | ||
running: bool | ||
runner: Future[void] | ||
client: RelayClient | ||
numRelays: int | ||
relayPeers: Table[PeerId, Future[void]] | ||
relayAddresses: Table[PeerId, seq[MultiAddress]] | ||
backingOff: seq[PeerId] | ||
peerAvailable: AsyncEvent | ||
onReservation: OnReservationHandler | ||
rng: ref HmacDrbgContext | ||
|
||
proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} = | ||
while self.running: | ||
let | ||
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5)) | ||
relayedAddr = rsvp.addrs.mapIt( | ||
MultiAddress.init($it & "/p2p-circuit/p2p/" & $selfPid).tryGet()) | ||
ttl = rsvp.expire.int64 - times.now().utc.toTime.toUnix | ||
if ttl <= 60: | ||
# A reservation under a minute is basically useless | ||
break | ||
if relayPid notin self.relayAddresses or self.relayAddresses[relayPid] != relayedAddr: | ||
self.relayAddresses[relayPid] = relayedAddr | ||
if not self.onReservation.isNil(): | ||
self.onReservation(concat(toSeq(self.relayAddresses.values))) | ||
await sleepAsync chronos.seconds(ttl - 30) | ||
|
||
method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = | ||
let hasBeenSetUp = await procCall Service(self).setup(switch) | ||
if hasBeenSetUp: | ||
proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} = | ||
trace "Peer Joined", peerId | ||
if self.relayPeers.len < self.numRelays: | ||
self.peerAvailable.fire() | ||
proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async.} = | ||
trace "Peer Left", peerId | ||
self.relayPeers.withValue(peerId, future): | ||
future[].cancel() | ||
switch.addPeerEventHandler(handlePeerJoined, Joined) | ||
switch.addPeerEventHandler(handlePeerLeft, Left) | ||
await self.run(switch) | ||
return hasBeenSetUp | ||
|
||
proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} = | ||
await sleepAsync(chronos.seconds(5)) | ||
self.backingOff.keepItIf(it != pid) | ||
self.peerAvailable.fire() | ||
|
||
proc innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = | ||
while true: | ||
# Remove relayPeers that failed | ||
let peers = toSeq(self.relayPeers.keys()) | ||
for k in peers: | ||
if self.relayPeers[k].finished(): | ||
self.relayPeers.del(k) | ||
self.relayAddresses.del(k) | ||
if not self.onReservation.isNil(): | ||
self.onReservation(concat(toSeq(self.relayAddresses.values))) | ||
# To avoid ddosing our peers in certain conditions | ||
self.backingOff.add(k) | ||
asyncSpawn self.manageBackedOff(k) | ||
|
||
# Get all connected relayPeers | ||
self.peerAvailable.clear() | ||
var connectedPeers = switch.connectedPeers(Direction.Out) | ||
connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it] and | ||
it notin self.relayPeers and | ||
it notin self.backingOff) | ||
self.rng.shuffle(connectedPeers) | ||
|
||
for relayPid in connectedPeers: | ||
if self.relayPeers.len() >= self.numRelays: | ||
break | ||
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) | ||
|
||
if self.relayPeers.len() > 0: | ||
await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() | ||
else: | ||
await self.peerAvailable.wait() | ||
await sleepAsync(200.millis) | ||
|
||
method run*(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = | ||
if self.running: | ||
trace "Autorelay is already running" | ||
return | ||
self.running = true | ||
self.runner = self.innerRun(switch) | ||
|
||
method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = | ||
let hasBeenStopped = await procCall Service(self).stop(switch) | ||
if hasBeenStopped: | ||
self.running = false | ||
self.runner.cancel() | ||
return hasBeenStopped | ||
|
||
proc getAddresses*(self: AutoRelayService): seq[MultiAddress] = | ||
result = concat(toSeq(self.relayAddresses.values)) | ||
|
||
proc new*(T: typedesc[AutoRelayService], | ||
numRelays: int, | ||
client: RelayClient, | ||
onReservation: OnReservationHandler, | ||
rng: ref HmacDrbgContext): T = | ||
T(numRelays: numRelays, | ||
client: client, | ||
onReservation: onReservation, | ||
peerAvailable: newAsyncEvent(), | ||
rng: rng) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
{.used.} | ||
|
||
import chronos, options | ||
import ../libp2p | ||
import ../libp2p/[crypto/crypto, | ||
protocols/connectivity/relay/relay, | ||
protocols/connectivity/relay/messages, | ||
protocols/connectivity/relay/utils, | ||
protocols/connectivity/relay/client, | ||
services/autorelayservice] | ||
import ./helpers | ||
import stew/byteutils | ||
|
||
proc createSwitch(r: Relay, autorelay: Service = nil): Switch = | ||
var builder = SwitchBuilder.new() | ||
.withRng(newRng()) | ||
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) | ||
.withTcpTransport() | ||
.withMplex() | ||
.withNoise() | ||
.withCircuitRelay(r) | ||
if not autorelay.isNil(): | ||
builder = builder.withServices(@[autorelay]) | ||
builder.build() | ||
|
||
|
||
proc buildRelayMA(switchRelay: Switch, switchClient: Switch): MultiAddress = | ||
MultiAddress.init($switchRelay.peerInfo.addrs[0] & "/p2p/" & | ||
$switchRelay.peerInfo.peerId & "/p2p-circuit/p2p/" & | ||
$switchClient.peerInfo.peerId).get() | ||
|
||
suite "Autorelay": | ||
asyncTeardown: | ||
checkTrackers() | ||
|
||
var | ||
switchRelay {.threadvar.}: Switch | ||
switchClient {.threadvar.}: Switch | ||
relayClient {.threadvar.}: RelayClient | ||
autorelay {.threadvar.}: AutoRelayService | ||
|
||
asyncTest "Simple test": | ||
switchRelay = createSwitch(Relay.new()) | ||
relayClient = RelayClient.new() | ||
let fut = newFuture[void]() | ||
proc checkMA(addresses: seq[MultiAddress]) = | ||
check: addresses[0] == buildRelayMA(switchRelay, switchClient) | ||
check: addresses.len() == 1 | ||
fut.complete() | ||
autorelay = AutoRelayService.new(3, relayClient, checkMA, newRng()) | ||
switchClient = createSwitch(relayClient, autorelay) | ||
await allFutures(switchClient.start(), switchRelay.start()) | ||
await switchClient.connect(switchRelay.peerInfo.peerId, switchRelay.peerInfo.addrs) | ||
await fut.wait(1.seconds) | ||
let addresses = autorelay.getAddresses() | ||
check: | ||
addresses[0] == buildRelayMA(switchRelay, switchClient) | ||
addresses.len() == 1 | ||
await allFutures(switchClient.stop(), switchRelay.stop()) | ||
|
||
asyncTest "Connect after starting switches": | ||
switchRelay = createSwitch(Relay.new()) | ||
relayClient = RelayClient.new() | ||
let fut = newFuture[void]() | ||
proc checkMA(address: seq[MultiAddress]) = | ||
check: address[0] == buildRelayMA(switchRelay, switchClient) | ||
fut.complete() | ||
let autorelay = AutoRelayService.new(3, relayClient, checkMA, newRng()) | ||
switchClient = createSwitch(relayClient, autorelay) | ||
await allFutures(switchClient.start(), switchRelay.start()) | ||
await sleepAsync(500.millis) | ||
await switchClient.connect(switchRelay.peerInfo.peerId, switchRelay.peerInfo.addrs) | ||
await fut.wait(1.seconds) | ||
let addresses = autorelay.getAddresses() | ||
check: | ||
addresses[0] == buildRelayMA(switchRelay, switchClient) | ||
addresses.len() == 1 | ||
await allFutures(switchClient.stop(), switchRelay.stop()) | ||
|
||
asyncTest "Three relays connections": | ||
var state = 0 | ||
let | ||
rel1 = createSwitch(Relay.new()) | ||
rel2 = createSwitch(Relay.new()) | ||
rel3 = createSwitch(Relay.new()) | ||
fut = newFuture[void]() | ||
relayClient = RelayClient.new() | ||
proc checkMA(addresses: seq[MultiAddress]) = | ||
if state == 0 or state == 2: | ||
check: | ||
buildRelayMA(rel1, switchClient) in addresses | ||
addresses.len() == 1 | ||
state += 1 | ||
elif state == 1: | ||
check: | ||
buildRelayMA(rel1, switchClient) in addresses | ||
buildRelayMA(rel2, switchClient) in addresses | ||
addresses.len() == 2 | ||
state += 1 | ||
elif state == 3: | ||
check: | ||
buildRelayMA(rel1, switchClient) in addresses | ||
buildRelayMA(rel3, switchClient) in addresses | ||
addresses.len() == 2 | ||
state += 1 | ||
fut.complete() | ||
let autorelay = AutoRelayService.new(2, relayClient, checkMA, newRng()) | ||
switchClient = createSwitch(relayClient, autorelay) | ||
await allFutures(switchClient.start(), rel1.start(), rel2.start(), rel3.start()) | ||
await switchClient.connect(rel1.peerInfo.peerId, rel1.peerInfo.addrs) | ||
await sleepAsync(500.millis) | ||
await switchClient.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) | ||
await switchClient.connect(rel3.peerInfo.peerId, rel3.peerInfo.addrs) | ||
await sleepAsync(500.millis) | ||
await rel2.stop() | ||
await fut.wait(1.seconds) | ||
let addresses = autorelay.getAddresses() | ||
check: | ||
buildRelayMA(rel1, switchClient) in addresses | ||
buildRelayMA(rel3, switchClient) in addresses | ||
addresses.len() == 2 | ||
await allFutures(switchClient.stop(), rel1.stop(), rel3.stop()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters