Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom talkreq implementation #464

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/p2p/discoveryv5/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ proc updateRecord*(
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated?

proc send(d: Protocol, a: Address, data: seq[byte]) =
proc send*(d: Protocol, a: Address, data: seq[byte]) =
let ta = initTAddress(a.ip, a.port)
let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) {.gcsafe.} =
Expand Down
92 changes: 55 additions & 37 deletions eth/utp/utp_discv5_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
{.push raises: [Defect].}

import
std/[hashes],
std/[hashes, sugar],
chronos, chronicles,
../p2p/discoveryv5/protocol,
../p2p/discoveryv5/[protocol, messages, encoding],
./utp_router,
../keys

Expand All @@ -18,64 +18,82 @@ export utp_router, protocol, chronicles
logScope:
topics = "utp_discv5_protocol"

type UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[Node]
type
NodeAddress* = object
nodeId*: NodeId
address*: Address

proc hash(x: UtpSocketKey[Node]): Hash =
UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[NodeAddress]

proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress =
NodeAddress(nodeId: nodeId, address: address)

proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] =
node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address))

proc hash(x: NodeAddress): Hash =
var h = 0
h = h !& x.nodeId.hash
h = h !& x.address.hash
!$h

proc hash(x: UtpSocketKey[NodeAddress]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h

func `$`*(x: UtpSocketKey[Node]): string =
"(remoteId: " & $x.remoteAddress.id &
func `$`*(x: UtpSocketKey[NodeAddress]): string =
"(remoteId: " & $x.remoteAddress.nodeId &
", remoteAddress: " & $x.remoteAddress.address &
", rcvId: "& $x.rcvId &
")"

proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): Future[void] =
let
reqId = RequestId.init(p.rng[])
message = encodeMessage(TalkReqMessage(protocol: protocol, request: request), reqId)

(data, nonce) = encodeMessagePacket(p.rng[], p.codec, n.nodeId, n.address, message)

trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
p.send(n.address, data)

proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] =
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] =
return (
proc (to: Node, data: seq[byte]): Future[void] =
proc (to: NodeAddress, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# TODO: In discovery v5 each talkreq waits for a talkresp, but here we
# would really like the fire and forget semantics (similar to udp).
# For now start talkreq/talkresp in background, and discard its result.
# That way we also lose information about any possible errors.
# Consider adding talkreq proc which does not wait for the response.
discard t.talkreq(to, subProtocolName, data)
# hidden assumption here is that nodes already have established discv5 session
# between each other. In our use case this should be true as openning stream
# is only done after succesful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
# which forces nodes to establish session between each other.
discard t.talkReqDirect(to, subProtocolName, data)
fut.complete()
return fut
)

proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol)
let maybeSender = p.prot.getNode(srcId)

if maybeSender.isSome():
debug "Received utp payload from known node. Start processing"
let sender = maybeSender.unsafeGet()
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always send empty responses as discv5 spec requires that talkreq
# always receives a talkresp.
@[]
else:
debug "Received utp payload from unknown node. Ignore"
@[]
let
p = UtpDiscv5Protocol(protocol)
nodeAddress = NodeAddress.init(srcId, srcUdpAddress)
debug "Received utp payload from known node. Start processing",
nodeId = nodeAddress.nodeId, address = nodeAddress.address
asyncSpawn p.router.processIncomingBytes(request, nodeAddress)

proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node],
allowConnectionCb: AllowConnectionCallback[Node] = nil,
acceptConnectionCb: AcceptConnectionCallback[NodeAddress],
allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil,
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
doAssert(not(isNil(acceptConnectionCb)))

let router = UtpRouter[Node].new(
let router = UtpRouter[NodeAddress].new(
acceptConnectionCb,
allowConnectionCb,
socketConfig,
Expand All @@ -94,12 +112,12 @@ proc new*(
)
prot

proc connectTo*(r: UtpDiscv5Protocol, address: Node):
Future[ConnectionResult[Node]] =
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress):
Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address)

proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16):
Future[ConnectionResult[Node]] =
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress, connectionId: uint16):
Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address, connectionId)

proc shutdown*(r: UtpDiscv5Protocol) =
Expand Down
47 changes: 22 additions & 25 deletions tests/utp/test_discv5_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{.used.}

import
std/options,
chronos, bearssl,
stew/shims/net, stew/byteutils,
testutils/unittests,
Expand Down Expand Up @@ -48,23 +49,23 @@ procSuite "Utp protocol over discovery v5 tests":
let rng = newRng()
let utpProtId = "test-utp".toBytes()

proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[Node] =
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] =
return (
proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] =
proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
serverSockets.addLast(client)
)

proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[Node] =
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] =
return (
proc(r: UtpRouter[Node], remoteAddress: Node, connectionId: uint16): bool =
proc(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool =
connectionId == allowedId
)

# TODO Add more tests to discovery v5 suite, especially those which will differ
# from standard utp case
asyncTest "Success connect to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
Expand All @@ -73,12 +74,11 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))

# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()

let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()

Expand All @@ -95,7 +95,7 @@ procSuite "Utp protocol over discovery v5 tests":

asyncTest "Success write data over packet size to remote host":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
Expand All @@ -104,13 +104,12 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))

# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()

let numOfBytes = 5000
let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()

let serverSocket = await queue.get()
Expand All @@ -135,7 +134,7 @@ procSuite "Utp protocol over discovery v5 tests":
let
allowedId: uint16 = 10
lowSynTimeout = milliseconds(500)
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
Expand All @@ -154,13 +153,12 @@ procSuite "Utp protocol over discovery v5 tests":
allowOneIdCallback(allowedId),
SocketConfig.init())

# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()

let clientSocketResult1 = await utp1.connectTo(node2.localNode, allowedId)
let clientSocketResult2 = await utp1.connectTo(node2.localNode, allowedId + 1)
let clientSocketResult1 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId)
let clientSocketResult2 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId + 1)

check:
clientSocketResult1.isOk()
Expand All @@ -180,7 +178,7 @@ procSuite "Utp protocol over discovery v5 tests":

asyncTest "Configure incoming connections to be in connected state":
let
queue = newAsyncQueue[UtpSocket[Node]]()
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
Expand All @@ -194,12 +192,11 @@ procSuite "Utp protocol over discovery v5 tests":
socketConfig = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
)

# nodes must know about each other
# nodes must have session between each other
check:
node1.addNode(node2.localNode)
node2.addNode(node1.localNode)
(await node1.ping(node2.localNode)).isOk()

let clientSocketResult = await utp1.connectTo(node2.localNode)
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()

Expand Down