From 6ab779d30a6f294edef67f8e8d707ea4b09c1f6d Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 11 Oct 2023 11:35:00 +0200 Subject: [PATCH 01/18] MultiAddress support --- libp2p/multiaddress.nim | 47 ++++++++++++++++++++++++++++++++++++-- libp2p/multicodec.nim | 3 +++ tests/testmultiaddress.nim | 12 +++++++--- 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 02f5c48037..38835f6071 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -297,6 +297,33 @@ proc dnsVB(vb: var VBuffer): bool = if s.find('/') == -1: result = true +proc certHashStB(s: string, vb: var VBuffer): bool = + ## CertHash address stringToBuffer() implementation. + var data = MultiBase.decode(s).valueOr: + return false + var mh: MultiHash + if MultiHash.decode(data, mh).isOk: + vb.writeSeq(data) + result = true + +proc certHashBtS(vb: var VBuffer, s: var string): bool = + ## CertHash address bufferToString() implementation. + var address = newSeq[byte]() + if vb.readSeq(address) > 0: + var mh: MultiHash + if MultiHash.decode(address, mh).isOk: + s = MultiBase.encode("base64", address).valueOr: + return false + result = true + +proc certHashVB(vb: var VBuffer): bool = + ## CertHash address validateBuffer() implementation. + var address = newSeq[byte]() + if vb.readSeq(address) > 0: + var mh: MultiHash + if MultiHash.decode(address, mh).isOk: + result = true + proc mapEq*(codec: string): MaPattern = ## ``Equal`` operator for pattern result.operator = Eq @@ -358,6 +385,11 @@ const bufferToString: dnsBtS, validateBuffer: dnsVB ) + TranscoderCertHash* = Transcoder( + stringToBuffer: certHashStB, + bufferToString: certHashBtS, + validateBuffer: certHashVB + ) ProtocolsList = [ MAProtocol( mcodec: multiCodec("ip4"), kind: Fixed, size: 4, @@ -458,7 +490,17 @@ const ), MAProtocol( mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0 - ) + ), + MAProtocol( + mcodec: multiCodec("webrtc"), kind: Marker, size: 0 + ), + MAProtocol( + mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0 + ), + MAProtocol( + mcodec: multiCodec("certhash"), kind: Length, size: 0, + coder: TranscoderCertHash + ), ] DNSANY* = mapEq("dns") @@ -489,6 +531,7 @@ const WebSockets_DNS* = mapOr(WS_DNS, WSS_DNS) WebSockets_IP* = mapOr(WS_IP, WSS_IP) WebSockets* = mapOr(WS, WSS) + WebRtcDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash")) Onion3* = mapEq("onion3") TcpOnion3* = mapAnd(TCP, Onion3) @@ -512,7 +555,7 @@ const mapAnd(DNS, mapEq("https")) ) - WebRTCDirect* = mapOr( + WebRTCDirect* {.deprecated.} = mapOr( mapAnd(HTTP, mapEq("p2p-webrtc-direct")), mapAnd(HTTPS, mapEq("p2p-webrtc-direct")) ) diff --git a/libp2p/multicodec.nim b/libp2p/multicodec.nim index 184da57121..eb15fd3012 100644 --- a/libp2p/multicodec.nim +++ b/libp2p/multicodec.nim @@ -193,11 +193,14 @@ const MultiCodecList = [ ("https", 0x01BB), ("tls", 0x01C0), ("quic", 0x01CC), + ("certhash", 0x01D2), ("ws", 0x01DD), ("wss", 0x01DE), ("p2p-websocket-star", 0x01DF), # not in multicodec list ("p2p-webrtc-star", 0x0113), # not in multicodec list ("p2p-webrtc-direct", 0x0114), # not in multicodec list + ("webrtc-direct", 0x0118), + ("webrtc", 0x0119), ("onion", 0x01BC), ("onion3", 0x01BD), ("p2p-circuit", 0x0122), diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index 025909e48b..1d7edc72c3 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -147,7 +147,9 @@ const "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234", "/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC", "/p2p-webrtc-star/ip4/127.0.0.1/tcp/9090/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC", - "/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC" + "/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC", + "/ip4/127.0.0.1/udp/1234/webrtc-direct", + "/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g", ] RustSuccessExpects = [ @@ -177,7 +179,9 @@ const "047F000001A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B0604D2", "29200108A07AC542013AC986FFFE317095061F40DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B", "9302047F000001062382DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B", - "047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B" + "047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B", + "047F000001910204D29802", + "047F000001910204D29802D203221220C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2" ] RustFailureVectors = [ @@ -211,7 +215,9 @@ const "/ip4/127.0.0.1/tcp", "/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", - "/p2p-circuit/50" + "/p2p-circuit/50", + "/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/b2uaraocy6yrdblb4sfptaddgimjmmp", + "/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash" ] PathVectors = [ From e0f2b00f9a0b9c35690505700fde5935d88e7f09 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 11 Oct 2023 16:18:53 +0200 Subject: [PATCH 02/18] WebRTC scaffolding --- .pinned | 9 +- libp2p.nimble | 1 + libp2p/transports/tcptransport.nim | 2 +- libp2p/transports/webrtctransport.nim | 407 ++++++++++++++++++++++++++ libp2p/transports/wstransport.nim | 2 - 5 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 libp2p/transports/webrtctransport.nim diff --git a/.pinned b/.pinned index 3d79f8b63c..dfa76df1db 100644 --- a/.pinned +++ b/.pinned @@ -1,17 +1,20 @@ bearssl;https://github.com/status-im/nim-bearssl@#e4157639db180e52727712a47deaefcbbac6ec86 +binary_serialization;https://github.com/status-im/nim-binary-serialization.git@#f9c05ed21e6be17c0d1e16a364a0ab19fe4a64bf chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a chronos;https://github.com/status-im/nim-chronos@#ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8 faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309 -httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18 +httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df +mbedtls;https://github.com/status-im/nim-mbedtls.git@#1167c90f1e5c1f4872868720140ff5b0840d18be metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff -nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288 +nimcrypto;https://github.com/cheatfate/nimcrypto@#a079df92424968d46a6ac258299ce9380aa153f2 results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c +webrtc;https://github.com/status-im/nim-webrtc.git@#6174511f5b8f89152252f39404e8f30e6dcd7c4c websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 -zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 \ No newline at end of file diff --git a/libp2p.nimble b/libp2p.nimble index 7fd5fb8548..248607e1d8 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -17,6 +17,7 @@ requires "nim >= 1.6.0", "secp256k1", "stew#head", "websock", + "https://github.com/status-im/nim-webrtc.git", "unittest2 >= 0.0.5 & <= 0.1.0" let nimc = getEnv("NIMC", "nim") # Which nim compiler to use diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 18dd235fe8..e134e6de83 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -172,7 +172,7 @@ method start*( self.servers &= server - trace "Listening on", address = ma + trace "Listening on", address = self.addrs[i] method stop*(self: TcpTransport) {.async, gcsafe.} = ## stop the transport diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim new file mode 100644 index 0000000000..033856f53b --- /dev/null +++ b/libp2p/transports/webrtctransport.nim @@ -0,0 +1,407 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +## WebRtc transport implementation +## For now, only support WebRtc direct (ie browser to server) + +{.push raises: [].} + +import std/[sequtils] +import stew/results +import chronos, chronicles +import transport, + ../errors, + ../wire, + ../multicodec, + ../connmanager, + ../multiaddress, + ../stream/connection, + ../upgrademngrs/upgrade, + ../utility + +logScope: + topics = "libp2p webrtctransport" + +export transport, results + +const + WebRtcTransportTrackerName* = "libp2p.webrtctransport" + +# -- Message -- +type + MessageFlag = enum + Fin = 0 + StopSending = 1 + ResetStream = 2 + FinAck = 3 + + WebRtcMessage = object + flag: Opt[MessageFlag] + data: seq[byte] + +proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] = + var + pb = initProtoBuffer(bytes) + flagOrd: uint32 + res: WebRtcMessage + if ? pb.getField(1, flagOrd): + var flag: MessageFlag + if flag.checkEnumAssign(flagOrd): + res.flag = Opt.some(flag) + + discard ? pb.getField(2, res.data) + Opt.some(res) + +proc encode(msg: WebRtcMessage): seq[byte] = + var pb = initProtoBuffer() + + msg.flag.withValue(val): + pb.writeField(1, val) + + if msg.data.len > 0: + pb.writeField(2, msg.data) + + pb.finish() + pb.buffer + +# -- Stream -- +const MaxMessageSize = 16384 # 16KiB + +type + WebRtcState = enum + Sending, Closing, Closed + + WebRtcStream = ref object of Connection + dataChannel: DataChannel + sendQueue: seq[(seq[byte], Future[void])] + sendLoop: Future[void] + readData: seq[byte] + txState: WebRtcState + rxState: WebRtcState + +proc new( + _: type WebRtcStream, + dataChannel: DataChannel, + oaddr: Opt[MultiAddress], + peerId: PeerId): WebRtcStream = + let stream = WebRtcStream(stream: stream, observedAddr: oaddr, peerId: peerId) + procCall Connection(stream).initStream() + stream + +proc sender(s: WebRtcConnection) {.async.} = + while s.sendQueue.len > 0: + let (message, fut) = s.sendQueue.pop() + #TODO handle exceptions + await s.dataChannel.write(message) + if not fut.isNil: fut.complete() + +proc send(s: WebRtcConnection, msg: WebRtcMessage, fut: Future[void] = nil) = + let wrappedMessage = msg.encode() + s.sendQueue.insert((wrappedMessage, fut)) + + if s.sendLoop == nil or s.sendLoop.finished: + s.sendLoop = s.sender() + +method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = + # We need to make sure we send all of our data before another write + # Otherwise, two concurrent writes could get intertwined + # We avoid this by filling the s.sendQueue synchronously + + let retFuture = newFuture[void]("WebRtcConnection.write") + if s.txState != Sending: + retFuture.fail(newLPStreamClosedError()) + return retFuture + + var messages: seq[seq[byte]] + while msg.len > MaxMessageSize - 16: + let + endOfMessage = MaxMessageSize - 16 + wrappedMessage = WebRtcMessage(data: msg[0 ..< endOfMessage]) + s.send(wrappedMessage) + msg = msg[endOfMessage .. ^1] + + let + wrappedMessage = WebRtcMessage(data: msg) + s.send(wrappedMessage, retFuture) + + return retFuture + +proc actuallyClose(s: WebRtcConnection) {.async.} = + if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: + await s.conn.close() + await procCall Connection(s).closeImpl() + +method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] {.async.} = + if s.atEof: + raise newLPStreamEOFError() + + while s.readData.len == 0: + if s.rxState == Closed: + s.atEof = true + await s.actuallyClose() + return 0 + + let + #TODO handle exceptions + message = await s.conn.read() + decoded = WebRtcMessage.decode(message) + + decoded.flag.withValue(flag): + case flag: + of Fin: + # Peer won't send any more data + s.rxState = Closed + s.send(WebRtcMessage(flag: Opt.some(FinAck))) + of FinAck: + s.txState = Closed + await s.actuallyClose() + + s.readData = decoded.data + + result = min(nbytes, s.readData.len) + copyMem(pbytes, addr s.readData[0], toCopy) + stream.cached = stream.cached[result..^1] + +method closeImpl*(s: WebRtcConnection) {.async.} = + s.send(WebRtcMessage(flag: Opt.some(Fin))) + s.txState = Closing + await s.join() #TODO ?? + +# -- Connection -- +type WebRtcConnection = ref object of Connection + connection: DataChannelConnection + +method close*(conn: WebRtcConnection) {.async.} = + #TODO + discard + +proc getStream*(conn: WebRtcConnection, + direction: Direction): Future[WebRtcStream] {.async.} = + var datachannel = + case direction: + of Direction.In: + await conn.connection.incomingStream() + of Direction.Out: + await conn.connection.openStream() + return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) + +# -- Muxer -- +type WebRtcMuxer = ref object of Muxer + webRtcConn: WebRtcConnection + handleFut: Future[void] + +method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = + return await m.webRtcConn.getStream(Direction.Out) + +proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} = + try: + await m.streamHandler(chann) + trace "finished handling stream" + doAssert(chann.closed, "connection not closed by handler!") + except CatchableError as exc: + trace "Exception in mplex stream handler", msg = exc.msg + await chann.close() + +method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} = + try: + while not m.webRtcConn.atEof: + let incomingStream = await m.webRtcConn.getStream(Direction.In) + asyncSpawn m.handleStream(incomingStream) + finally: + await m.webRtcConn.close() + +method close*(m: WebRtcMuxer) {.async, gcsafe.} = + m.handleFut.cancel() + await m.webRtcConn.close() + +# -- Upgrader -- +type WebRtcUpgrade = ref object of Upgrade + +method upgrade*( + self: WebRtcUpgrade, + conn: Connection, + direction: Direction, + peerId: Opt[PeerId]): Future[Muxer] {.async.} = + + let webRtcConn = WebRtcConnection(conn) + result = WebRtcMuxer(webRtcConn: webRtcConn) + + # Noise handshake + let noiseHandler = self.secureManagers.filterIt(it of Noise) + assert noiseHandler.len > 0 + + let + stream = webRtcConn.getStream(Out) #TODO add channelId: 0 + secureStream = noiseHandler[0].handshake( + stream, + initiator: true, # we are always the initiator in webrtc-direct + peerId: peerId + #TODO: add prelude data + ) + + # Peer proved its identity, we can close this + await secureStream.close() + await stream.close() + +# -- Transport -- +type + WebRtcTransport* = ref object of Transport + connectionsTimeout: Duration + servers: seq[WebRtc] + clients: array[Direction, seq[DataChannelConnection]] + + WebRtcTransportTracker* = ref object of TrackerBase + opened*: uint64 + closed*: uint64 + + WebRtcTransportError* = object of transport.TransportError + +proc setupWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe, raises: [].} + +proc getWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe.} = + result = cast[WebRtcTransportTracker](getTracker(WebRtcTransportTrackerName)) + if isNil(result): + result = setupWebRtcTransportTracker() + +proc dumpTracking(): string {.gcsafe.} = + var tracker = getWebRtcTransportTracker() + result = "Opened tcp transports: " & $tracker.opened & "\n" & + "Closed tcp transports: " & $tracker.closed + +proc leakTransport(): bool {.gcsafe.} = + var tracker = getWebRtcTransportTracker() + result = (tracker.opened != tracker.closed) + +proc setupWebRtcTransportTracker(): WebRtcTransportTracker = + result = new WebRtcTransportTracker + result.opened = 0 + result.closed = 0 + result.dump = dumpTracking + result.isLeaked = leakTransport + addTracker(WebRtcTransportTrackerName, result) + +proc new*( + T: typedesc[WebRtcTransport], + upgrade: Upgrade, + connectionsTimeout = 10.minutes): T {.public.} = + + let + transport = T( + upgrader: WebRtcUpgrade(secureManagers: upgrade.secureManagers), + connectionsTimeout: connectionsTimeout) + + return transport + +method start*( + self: WebRtcTransport, + addrs: seq[MultiAddress]) {.async.} = + ## listen on the transport + ## + + if self.running: + warn "WebRtc transport already running" + return + + await procCall Transport(self).start(addrs) + trace "Starting WebRtc transport" + inc getWebRtcTransportTracker().opened + + for i, ma in addrs: + if not self.handles(ma): + trace "Invalid address detected, skipping!", address = ma + continue + + let + transportAddress = initTAddress(ma) + server = WebRtc.new(transportAddress) + server.start() + + self.servers &= server + + self.addrs[i] = MultiAddress.init(server.getLocalAddress(), IPPROTO_UDP).tryGet() + #TODO add webrtc-direct & certhash + + trace "Listening on", address = self.addrs[i] + +proc connHandler(self: WebRtcTransport, + client: DataChannelConnection, + observedAddr: Opt[MultiAddress], + dir: Direction): Future[Connection] {.async.} = + + trace "Handling ws connection", address = $observedAddr, + dir = $dir, + clients = self.clients[Direction.In].len + + self.clients[Direction.Out].len + + let conn: Connection = + WebRtcConnection.new( + client = client, + dir = dir, + observedAddr = observedAddr, + timeout = self.connectionsTimeout + ) + + proc onClose() {.async.} = + try: + let futs = @[client.join(), conn.join()] + await futs[0] or futs[1] + for f in futs: + if not f.finished: await f.cancelAndWait() # cancel outstanding join() + + trace "Cleaning up client", addrs = $client.remoteAddress, + conn + + self.clients[dir].keepItIf( it != client ) + await allFuturesThrowing( + conn.close(), client.closeWait()) + + trace "Cleaned up client", addrs = $client.remoteAddress, + conn + + except CatchableError as exc: + let useExc {.used.} = exc + debug "Error cleaning up client", errMsg = exc.msg, conn + + self.clients[dir].add(client) + asyncSpawn onClose() + + return conn + +method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} = + if not self.running: + raise newTransportClosedError() + + #TODO handle errors + if self.acceptFuts.len <= 0: + self.acceptFuts = self.servers.mapIt(it.accept()) + + if self.acceptFuts.len <= 0: + return + + let + finished = await one(self.acceptFuts) + index = self.acceptFuts.find(finished) + + self.acceptFuts[index] = self.servers[index].accept() + + let transp = await finished + try: + let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() + return await self.connHandler(transp, Opt.some(observedAddr), Direction.In) + except CancelledError as exc: + transp.close() + raise exc + except CatchableError as exc: + debug "Failed to handle connection", exc = exc.msg + transp.close() + +method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} = + if procCall Transport(t).handles(address): + if address.protocols.isOk: + return WebRtcDirect2.match(address) diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index 97f6cf08eb..aed1c2a285 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -171,8 +171,6 @@ method start*( trace "Listening on", addresses = self.addrs - self.running = true - method stop*(self: WsTransport) {.async, gcsafe.} = ## stop the transport ## From 30e93e7c0a113630e88ae8352977fc6efd69ef3f Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 13 Oct 2023 18:07:52 +0200 Subject: [PATCH 03/18] almost compiling --- .pinned | 4 +- libp2p/transports/webrtctransport.nim | 116 ++++++++++++++++---------- testwebrtc.nim | 16 ++++ 3 files changed, 90 insertions(+), 46 deletions(-) create mode 100644 testwebrtc.nim diff --git a/.pinned b/.pinned index dfa76df1db..bb6c61dd73 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#6174511f5b8f89152252f39404e8f30e6dcd7c4c +webrtc;https://github.com/status-im/nim-webrtc.git@#7dfb18eefdbd2e722cdec6d6a7b255015365d308 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 -zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 033856f53b..d06a1795c2 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -13,18 +13,23 @@ {.push raises: [].} import std/[sequtils] -import stew/results +import stew/[endians2, byteutils, objects, results] import chronos, chronicles import transport, ../errors, ../wire, ../multicodec, + ../protobuf/minprotobuf, ../connmanager, + ../muxers/muxer, ../multiaddress, ../stream/connection, ../upgrademngrs/upgrade, + ../protocols/secure/noise, ../utility +import webrtc/webrtc, webrtc/datachannel + logScope: topics = "libp2p webrtctransport" @@ -50,22 +55,22 @@ proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] = pb = initProtoBuffer(bytes) flagOrd: uint32 res: WebRtcMessage - if ? pb.getField(1, flagOrd): + if ? pb.getField(1, flagOrd).toOpt(): var flag: MessageFlag - if flag.checkEnumAssign(flagOrd): + if flag.checkedEnumAssign(flagOrd): res.flag = Opt.some(flag) - discard ? pb.getField(2, res.data) + discard ? pb.getField(2, res.data).toOpt() Opt.some(res) proc encode(msg: WebRtcMessage): seq[byte] = var pb = initProtoBuffer() msg.flag.withValue(val): - pb.writeField(1, val) + pb.write(1, uint32(val)) if msg.data.len > 0: - pb.writeField(2, msg.data) + pb.write(2, msg.data) pb.finish() pb.buffer @@ -78,7 +83,7 @@ type Sending, Closing, Closed WebRtcStream = ref object of Connection - dataChannel: DataChannel + dataChannel: DataChannelStream sendQueue: seq[(seq[byte], Future[void])] sendLoop: Future[void] readData: seq[byte] @@ -87,33 +92,34 @@ type proc new( _: type WebRtcStream, - dataChannel: DataChannel, + dataChannel: DataChannelStream, oaddr: Opt[MultiAddress], peerId: PeerId): WebRtcStream = - let stream = WebRtcStream(stream: stream, observedAddr: oaddr, peerId: peerId) + let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId) procCall Connection(stream).initStream() stream -proc sender(s: WebRtcConnection) {.async.} = +proc sender(s: WebRtcStream) {.async.} = while s.sendQueue.len > 0: let (message, fut) = s.sendQueue.pop() #TODO handle exceptions await s.dataChannel.write(message) if not fut.isNil: fut.complete() -proc send(s: WebRtcConnection, msg: WebRtcMessage, fut: Future[void] = nil) = +proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) = let wrappedMessage = msg.encode() s.sendQueue.insert((wrappedMessage, fut)) if s.sendLoop == nil or s.sendLoop.finished: s.sendLoop = s.sender() -method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = +method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = # We need to make sure we send all of our data before another write # Otherwise, two concurrent writes could get intertwined # We avoid this by filling the s.sendQueue synchronously - let retFuture = newFuture[void]("WebRtcConnection.write") + var msg = msg2 + let retFuture = newFuture[void]("WebRtcStream.write") if s.txState != Sending: retFuture.fail(newLPStreamClosedError()) return retFuture @@ -132,25 +138,25 @@ method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = return retFuture -proc actuallyClose(s: WebRtcConnection) {.async.} = +proc actuallyClose(s: WebRtcStream) {.async.} = if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: - await s.conn.close() + #TODO add support to DataChannel + #await s.dataChannel.close() await procCall Connection(s).closeImpl() -method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] {.async.} = - if s.atEof: +method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + if s.rxState == Closed: raise newLPStreamEOFError() while s.readData.len == 0: if s.rxState == Closed: - s.atEof = true await s.actuallyClose() return 0 let #TODO handle exceptions - message = await s.conn.read() - decoded = WebRtcMessage.decode(message) + message = await s.dataChannel.read() + decoded = WebRtcMessage.decode(message).tryGet() decoded.flag.withValue(flag): case flag: @@ -161,14 +167,15 @@ method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] of FinAck: s.txState = Closed await s.actuallyClose() + else: discard s.readData = decoded.data result = min(nbytes, s.readData.len) - copyMem(pbytes, addr s.readData[0], toCopy) - stream.cached = stream.cached[result..^1] + copyMem(pbytes, addr s.readData[0], result) + s.readData = s.readData[result..^1] -method closeImpl*(s: WebRtcConnection) {.async.} = +method closeImpl*(s: WebRtcStream) {.async.} = s.send(WebRtcMessage(flag: Opt.some(Fin))) s.txState = Closing await s.join() #TODO ?? @@ -181,14 +188,23 @@ method close*(conn: WebRtcConnection) {.async.} = #TODO discard +proc new( + _: type WebRtcConnection, + conn: DataChannelConnection, + observedAddr: Opt[MultiAddress] + ): WebRtcConnection = + let co = WebRtcConnection(connection: conn, observedAddr: observedAddr) + procCall Connection(co).initStream() + co + proc getStream*(conn: WebRtcConnection, direction: Direction): Future[WebRtcStream] {.async.} = var datachannel = case direction: of Direction.In: - await conn.connection.incomingStream() + await conn.connection.accept() of Direction.Out: - await conn.connection.openStream() + await conn.connection.openStream(0) #TODO don't hardcode stream id (should be in nim-webrtc) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) # -- Muxer -- @@ -208,9 +224,12 @@ proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} = trace "Exception in mplex stream handler", msg = exc.msg await chann.close() +#TODO add atEof + method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} = try: - while not m.webRtcConn.atEof: + #while not m.webRtcConn.atEof: + while true: let incomingStream = await m.webRtcConn.getStream(Direction.In) asyncSpawn m.handleStream(incomingStream) finally: @@ -237,11 +256,11 @@ method upgrade*( assert noiseHandler.len > 0 let - stream = webRtcConn.getStream(Out) #TODO add channelId: 0 - secureStream = noiseHandler[0].handshake( + stream = await webRtcConn.getStream(Out) #TODO add channelId: 0 + secureStream = await noiseHandler[0].handshake( stream, - initiator: true, # we are always the initiator in webrtc-direct - peerId: peerId + initiator = true, # we are always the initiator in webrtc-direct + peerId = peerId #TODO: add prelude data ) @@ -254,6 +273,7 @@ type WebRtcTransport* = ref object of Transport connectionsTimeout: Duration servers: seq[WebRtc] + acceptFuts: seq[Future[DataChannelConnection]] clients: array[Direction, seq[DataChannelConnection]] WebRtcTransportTracker* = ref object of TrackerBase @@ -318,14 +338,17 @@ method start*( continue let - transportAddress = initTAddress(ma) + transportAddress = initTAddress(ma[0..1].tryGet()).tryGet() server = WebRtc.new(transportAddress) - server.start() + server.listen() self.servers &= server - self.addrs[i] = MultiAddress.init(server.getLocalAddress(), IPPROTO_UDP).tryGet() - #TODO add webrtc-direct & certhash + let + cert = server.dtls.localCertificate + certHash = MultiHash.digest("sha2-256", cert).get().data.buffer + encodedCertHash = MultiBase.encode("base64", certHash).get() + self.addrs[i] = (MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("cert-hash"), encodedCertHash).tryGet()).tryGet() trace "Listening on", address = self.addrs[i] @@ -341,15 +364,15 @@ proc connHandler(self: WebRtcTransport, let conn: Connection = WebRtcConnection.new( - client = client, - dir = dir, - observedAddr = observedAddr, - timeout = self.connectionsTimeout + conn = client, + # dir = dir, + observedAddr = observedAddr + # timeout = self.connectionsTimeout ) proc onClose() {.async.} = try: - let futs = @[client.join(), conn.join()] + let futs = @[conn.join(), conn.join()] #TODO that's stupid await futs[0] or futs[1] for f in futs: if not f.finished: await f.cancelAndWait() # cancel outstanding join() @@ -358,8 +381,9 @@ proc connHandler(self: WebRtcTransport, conn self.clients[dir].keepItIf( it != client ) - await allFuturesThrowing( - conn.close(), client.closeWait()) + #TODO + #await allFuturesThrowing( + # conn.close(), client.closeWait()) trace "Cleaned up client", addrs = $client.remoteAddress, conn @@ -392,14 +416,18 @@ method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} = let transp = await finished try: - let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() + #TODO add remoteAddress to DataChannelConnection + #let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct + let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet() return await self.connHandler(transp, Opt.some(observedAddr), Direction.In) except CancelledError as exc: - transp.close() + #TODO + #transp.close() raise exc except CatchableError as exc: debug "Failed to handle connection", exc = exc.msg - transp.close() + #TODO + #transp.close() method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/testwebrtc.nim b/testwebrtc.nim new file mode 100644 index 0000000000..17fb6f60a4 --- /dev/null +++ b/testwebrtc.nim @@ -0,0 +1,16 @@ +import chronos, libp2p, libp2p/transports/webrtctransport + +proc main {.async.} = + let switch = + SwitchBuilder.new() + .withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary + .withRng(crypto.newRng()) + .withMplex() + .withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr)) + .withNoise() + .build() + + await switch.start() + await sleepAsync(1.hours) + +waitFor main() From c6460ea7ce4a55f696c3934b27320b7f3bc2ac02 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 19 Oct 2023 12:12:56 +0200 Subject: [PATCH 04/18] fixes pinned + webrtctransport --- .pinned | 4 ++-- libp2p/transports/webrtctransport.nim | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.pinned b/.pinned index bb6c61dd73..661f150e5f 100644 --- a/.pinned +++ b/.pinned @@ -1,5 +1,5 @@ bearssl;https://github.com/status-im/nim-bearssl@#e4157639db180e52727712a47deaefcbbac6ec86 -binary_serialization;https://github.com/status-im/nim-binary-serialization.git@#f9c05ed21e6be17c0d1e16a364a0ab19fe4a64bf +binary_serialization;https://github.com/status-im/nim-binary-serialization.git@#38a73a70fd43f3835ca01a877353858b19e39d70 chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a chronos;https://github.com/status-im/nim-chronos@#ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8 @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#7dfb18eefdbd2e722cdec6d6a7b255015365d308 +webrtc;https://github.com/status-im/nim-webrtc.git@#0504d863407c1d14bf39478ba2335d299caac6ef websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index d06a1795c2..4e673e8718 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -345,7 +345,7 @@ method start*( self.servers &= server let - cert = server.dtls.localCertificate + cert = server.dtlsLocalCertificate() certHash = MultiHash.digest("sha2-256", cert).get().data.buffer encodedCertHash = MultiBase.encode("base64", certHash).get() self.addrs[i] = (MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("cert-hash"), encodedCertHash).tryGet()).tryGet() From f350479824c7d6ffcf6238e9bdef27e9629bc4bb Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Tue, 24 Oct 2023 17:20:27 +0200 Subject: [PATCH 05/18] update pinned + fixes --- .pinned | 2 +- libp2p/transports/webrtctransport.nim | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/.pinned b/.pinned index 661f150e5f..c42587ec1d 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#0504d863407c1d14bf39478ba2335d299caac6ef +webrtc;https://github.com/status-im/nim-webrtc.git@#a36708a5a0103f7dbbca85f7021eab864cc6cca0 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 4e673e8718..529fbc176f 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -19,6 +19,8 @@ import transport, ../errors, ../wire, ../multicodec, + ../multihash, + ../multibase, ../protobuf/minprotobuf, ../connmanager, ../muxers/muxer, @@ -28,7 +30,7 @@ import transport, ../protocols/secure/noise, ../utility -import webrtc/webrtc, webrtc/datachannel +import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls logScope: topics = "libp2p webrtctransport" @@ -345,10 +347,12 @@ method start*( self.servers &= server let - cert = server.dtlsLocalCertificate() + cert = server.dtls.localCertificate() certHash = MultiHash.digest("sha2-256", cert).get().data.buffer encodedCertHash = MultiBase.encode("base64", certHash).get() - self.addrs[i] = (MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("cert-hash"), encodedCertHash).tryGet()).tryGet() + self.addrs[i] = MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & + MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & + MultiAddress.init(multiCodec("certhash"), certHash).tryGet() trace "Listening on", address = self.addrs[i] @@ -377,17 +381,14 @@ proc connHandler(self: WebRtcTransport, for f in futs: if not f.finished: await f.cancelAndWait() # cancel outstanding join() - trace "Cleaning up client", addrs = $client.remoteAddress, - conn + trace "Cleaning up client"# TODO ?: , addrs = $client.remoteAddress, + # conn self.clients[dir].keepItIf( it != client ) #TODO #await allFuturesThrowing( # conn.close(), client.closeWait()) - trace "Cleaned up client", addrs = $client.remoteAddress, - conn - except CatchableError as exc: let useExc {.used.} = exc debug "Error cleaning up client", errMsg = exc.msg, conn From ad43f41ad7f22be6d0881f3b34d224fddc740eb9 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Tue, 7 Nov 2023 10:27:36 +0100 Subject: [PATCH 06/18] fix hashBtS --- libp2p/multiaddress.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 38835f6071..159e864227 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -312,7 +312,7 @@ proc certHashBtS(vb: var VBuffer, s: var string): bool = if vb.readSeq(address) > 0: var mh: MultiHash if MultiHash.decode(address, mh).isOk: - s = MultiBase.encode("base64", address).valueOr: + s = MultiBase.encode("base64url", address).valueOr: return false result = true From dab487eeb387c2455ae9fbca49866bd629f63dd8 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 9 Nov 2023 15:56:04 +0100 Subject: [PATCH 07/18] update pinned --- .pinned | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pinned b/.pinned index c42587ec1d..8d807679d0 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#a36708a5a0103f7dbbca85f7021eab864cc6cca0 +webrtc;https://github.com/status-im/nim-webrtc.git@#6391a3f2e58c269bbcad8ad53770efac64ede9a0 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 From 284188a74f614a9d576ef892a646bc78095d1c70 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 16 Nov 2023 16:05:41 +0100 Subject: [PATCH 08/18] update pinned --- .pinned | 2 +- libp2p/transports/webrtctransport.nim | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.pinned b/.pinned index 8d807679d0..32e4e66bda 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#6391a3f2e58c269bbcad8ad53770efac64ede9a0 +webrtc;https://github.com/status-im/nim-webrtc.git@#070aa1185433bfb449f3f07c188da82597e4f531 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 529fbc176f..0d88a10973 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -414,6 +414,7 @@ method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} = index = self.acceptFuts.find(finished) self.acceptFuts[index] = self.servers[index].accept() + trace "Accept WebRTC Transport" let transp = await finished try: From 7945cc754e3fb56ee0b2557b9382572ce1bdb625 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 23 Nov 2023 15:00:56 +0100 Subject: [PATCH 09/18] add prologue/remote & local cert to the handshake --- .pinned | 2 +- libp2p/protocols/secure/noise.nim | 2 +- libp2p/transports/webrtctransport.nim | 5 +++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.pinned b/.pinned index 32e4e66bda..1747f6e55d 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#070aa1185433bfb449f3f07c188da82597e4f531 +webrtc;https://github.com/status-im/nim-webrtc.git@#7d14cdcb48e24f6e1736d0b335e4e98bbd9221fb websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 4c3de72b7e..7d4a71842a 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -82,7 +82,7 @@ type localPrivateKey: PrivateKey localPublicKey: seq[byte] noiseKeys: KeyPair - commonPrologue: seq[byte] + commonPrologue*: seq[byte] outgoing: bool NoiseConnection* = ref object of SecureConn diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 0d88a10973..4bb78d5500 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -257,6 +257,11 @@ method upgrade*( let noiseHandler = self.secureManagers.filterIt(it of Noise) assert noiseHandler.len > 0 + let xx = "libp2p-webrtc-noise:".toBytes() + let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCert).get().data.buffer + let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.remoteCertificate()).get().data.buffer + ((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert + let stream = await webRtcConn.getStream(Out) #TODO add channelId: 0 secureStream = await noiseHandler[0].handshake( From 359a448c1b40a9c24addefc7bbc2b5244bd0d9a1 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Wed, 29 Nov 2023 14:53:01 +0100 Subject: [PATCH 10/18] update pinned & fix localCertificate --- .pinned | 2 +- libp2p/transports/webrtctransport.nim | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.pinned b/.pinned index 1747f6e55d..58df7d7e30 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#7d14cdcb48e24f6e1736d0b335e4e98bbd9221fb +webrtc;https://github.com/status-im/nim-webrtc.git@#ec51a19880f533d2f0afc9e79fa9894f81f25c40 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 4bb78d5500..0ef714edb3 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -258,9 +258,10 @@ method upgrade*( assert noiseHandler.len > 0 let xx = "libp2p-webrtc-noise:".toBytes() - let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCert).get().data.buffer + let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCertificate()).get().data.buffer let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.remoteCertificate()).get().data.buffer ((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert + echo "=> ", ((Noise)noiseHandler[0]).commonPrologue let stream = await webRtcConn.getStream(Out) #TODO add channelId: 0 From 58294ce15698f5df3efbc30e29d1d62d26e990ae Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 7 Dec 2023 10:00:16 +0100 Subject: [PATCH 11/18] update pinned --- .pinned | 2 +- libp2p/transports/webrtctransport.nim | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.pinned b/.pinned index 58df7d7e30..1ac94dfb36 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#ec51a19880f533d2f0afc9e79fa9894f81f25c40 +webrtc;https://github.com/status-im/nim-webrtc.git@#525fb37882397dead197152e0e49af38796940b5 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 0ef714edb3..28057330f4 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -141,6 +141,7 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = return retFuture proc actuallyClose(s: WebRtcStream) {.async.} = + debug "stream closed" if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: #TODO add support to DataChannel #await s.dataChannel.close() @@ -269,7 +270,6 @@ method upgrade*( stream, initiator = true, # we are always the initiator in webrtc-direct peerId = peerId - #TODO: add prelude data ) # Peer proved its identity, we can close this @@ -367,10 +367,9 @@ proc connHandler(self: WebRtcTransport, observedAddr: Opt[MultiAddress], dir: Direction): Future[Connection] {.async.} = - trace "Handling ws connection", address = $observedAddr, - dir = $dir, - clients = self.clients[Direction.In].len + - self.clients[Direction.Out].len + trace "Handling webrtc connection", address = $observedAddr, dir = $dir, + clients = self.clients[Direction.In].len + + self.clients[Direction.Out].len let conn: Connection = WebRtcConnection.new( From 60d48e644b867ffb5104c4ceb94d376b2bcde16b Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 15 Dec 2023 09:54:17 +0100 Subject: [PATCH 12/18] update pinned --- .pinned | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pinned b/.pinned index 1ac94dfb36..62ff9bb795 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#525fb37882397dead197152e0e49af38796940b5 +webrtc;https://github.com/status-im/nim-webrtc.git@#9a6657922ace51a93feb57520b12d9ab5a550a13 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 From 03ff023e9463abcea674cddcde83bc5731e08c28 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Mon, 5 Feb 2024 17:44:35 +0100 Subject: [PATCH 13/18] fix webrtcstream --- libp2p/transports/webrtctransport.nim | 43 ++++++++++++++++++++++++--- testwebrtc.nim | 22 ++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 28057330f4..d5d8392047 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -77,6 +77,39 @@ proc encode(msg: WebRtcMessage): seq[byte] = pb.finish() pb.buffer +# -- Raw WebRTC Stream -- + +type + RawWebRtcStream = ref object of Connection + dataChannel: DataChannelStream + readData: seq[byte] + +proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream = + let stream = RawWebRtcStream(dataChannel: dataChannel) + stream + +method closeImpl*(s: RawWebRtcStream): Future[void] = + # TODO: close datachannel + discard + +method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] = + trace "RawWebrtcStream write", msg, len=msg.len() + s.dataChannel.write(msg) + +method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + # TODO: + # if s.isClosed: + # raise newLPStreamEOFError() + + if s.readData.len() == 0: + let rawData = await s.dataChannel.read() + s.readData = rawData + trace "readOnce RawWebRtcStream", data = s.readData, nbytes + + result = min(nbytes, s.readData.len) + copyMem(pbytes, addr s.readData[0], result) + s.readData = s.readData[result..^1] + # -- Stream -- const MaxMessageSize = 16384 # 16KiB @@ -85,7 +118,7 @@ type Sending, Closing, Closed WebRtcStream = ref object of Connection - dataChannel: DataChannelStream + rawStream: RawWebRtcStream sendQueue: seq[(seq[byte], Future[void])] sendLoop: Future[void] readData: seq[byte] @@ -97,7 +130,8 @@ proc new( dataChannel: DataChannelStream, oaddr: Opt[MultiAddress], peerId: PeerId): WebRtcStream = - let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId) + let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel), + observedAddr: oaddr, peerId: peerId) procCall Connection(stream).initStream() stream @@ -105,7 +139,7 @@ proc sender(s: WebRtcStream) {.async.} = while s.sendQueue.len > 0: let (message, fut) = s.sendQueue.pop() #TODO handle exceptions - await s.dataChannel.write(message) + await s.rawStream.writeLp(message) if not fut.isNil: fut.complete() proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) = @@ -121,6 +155,7 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = # We avoid this by filling the s.sendQueue synchronously var msg = msg2 + trace "WebrtcStream write", msg, len=msg.len() let retFuture = newFuture[void]("WebRtcStream.write") if s.txState != Sending: retFuture.fail(newLPStreamClosedError()) @@ -158,7 +193,7 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a let #TODO handle exceptions - message = await s.dataChannel.read() + message = await s.rawStream.readLp(MaxMessageSize) decoded = WebRtcMessage.decode(message).tryGet() decoded.flag.withValue(flag): diff --git a/testwebrtc.nim b/testwebrtc.nim index 17fb6f60a4..efedbd56ad 100644 --- a/testwebrtc.nim +++ b/testwebrtc.nim @@ -1,4 +1,18 @@ import chronos, libp2p, libp2p/transports/webrtctransport +import stew/byteutils + +proc echoHandler(conn: Connection, proto: string) {.async.} = + defer: await conn.close() + while true: + try: + echo "\e[35;1m => Echo Handler <=\e[0m" + let msg = string.fromBytes(await conn.readLp(1024)) + echo " => Echo Handler Receive: ", msg, " <=" + echo " => Echo Handler Try Send: ", msg & "1", " <=" + await conn.writeLp(msg & "1") + except CatchableError as e: + echo " => Echo Handler Error: ", e.msg, " <=" + break proc main {.async.} = let switch = @@ -10,7 +24,15 @@ proc main {.async.} = .withNoise() .build() + let + codec = "/echo/1.0.0" + proto = new LPProtocol + proto.handler = echoHandler + proto.codec = codec + + switch.mount(proto) await switch.start() + echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m" await sleepAsync(1.hours) waitFor main() From afe2b081299af9a07b9c9509a2496fbbe57f4fd7 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 15 Feb 2024 16:10:20 +0100 Subject: [PATCH 14/18] Fix a lot of small bugs --- libp2p/transports/webrtctransport.nim | 54 ++++++++++++++++++++------- testwebrtc.nim | 8 +++- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index d5d8392047..19d09df4dc 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -122,8 +122,8 @@ type sendQueue: seq[(seq[byte], Future[void])] sendLoop: Future[void] readData: seq[byte] - txState: WebRtcState - rxState: WebRtcState + txState: WebRtcState # Transmission + rxState: WebRtcState # Reception proc new( _: type WebRtcStream, @@ -176,7 +176,7 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = return retFuture proc actuallyClose(s: WebRtcStream) {.async.} = - debug "stream closed" + debug "stream closed", rxState=s.rxState, txState=s.txState if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: #TODO add support to DataChannel #await s.dataChannel.close() @@ -186,7 +186,9 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a if s.rxState == Closed: raise newLPStreamEOFError() - while s.readData.len == 0: + while s.readData.len == 0 or nbytes == 0: + # Check if there's no data left in readData or if nbytes is equal to 0 + # in order to read an eventual Fin or FinAck if s.rxState == Closed: await s.actuallyClose() return 0 @@ -196,6 +198,8 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a message = await s.rawStream.readLp(MaxMessageSize) decoded = WebRtcMessage.decode(message).tryGet() + s.readData = s.readData.concat(decoded.data) + decoded.flag.withValue(flag): case flag: of Fin: @@ -205,10 +209,10 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a of FinAck: s.txState = Closed await s.actuallyClose() + if nbytes == 0: + return 0 else: discard - s.readData = decoded.data - result = min(nbytes, s.readData.len) copyMem(pbytes, addr s.readData[0], result) s.readData = s.readData[result..^1] @@ -216,7 +220,8 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a method closeImpl*(s: WebRtcStream) {.async.} = s.send(WebRtcMessage(flag: Opt.some(Fin))) s.txState = Closing - await s.join() #TODO ?? + while s.txState != Closed: + discard await s.readOnce(nil, 0) # -- Connection -- type WebRtcConnection = ref object of Connection @@ -236,13 +241,15 @@ proc new( co proc getStream*(conn: WebRtcConnection, - direction: Direction): Future[WebRtcStream] {.async.} = + direction: Direction, + noiseHandshake: bool = false): Future[WebRtcStream] {.async.} = var datachannel = case direction: of Direction.In: await conn.connection.accept() of Direction.Out: - await conn.connection.openStream(0) #TODO don't hardcode stream id (should be in nim-webrtc) + #TODO don't hardcode stream id (should be in nim-webrtc) + await conn.connection.openStream(noiseHandshake) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) # -- Muxer -- @@ -278,7 +285,10 @@ method close*(m: WebRtcMuxer) {.async, gcsafe.} = await m.webRtcConn.close() # -- Upgrader -- -type WebRtcUpgrade = ref object of Upgrade +type + WebRtcStreamHandler = proc(conn: Connection): Future[void] {.gcsafe, raises: [].} + WebRtcUpgrade = ref object of Upgrade + streamHandler: WebRtcStreamHandler method upgrade*( self: WebRtcUpgrade, @@ -287,7 +297,7 @@ method upgrade*( peerId: Opt[PeerId]): Future[Muxer] {.async.} = let webRtcConn = WebRtcConnection(conn) - result = WebRtcMuxer(webRtcConn: webRtcConn) + result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn) # Noise handshake let noiseHandler = self.secureManagers.filterIt(it of Noise) @@ -300,7 +310,7 @@ method upgrade*( echo "=> ", ((Noise)noiseHandler[0]).commonPrologue let - stream = await webRtcConn.getStream(Out) #TODO add channelId: 0 + stream = await webRtcConn.getStream(Out, true) #TODO add channelId: 0 secureStream = await noiseHandler[0].handshake( stream, initiator = true, # we are always the initiator in webrtc-direct @@ -311,6 +321,9 @@ method upgrade*( await secureStream.close() await stream.close() + result.streamHandler = self.streamHandler + result.handler = result.handle() + # -- Transport -- type WebRtcTransport* = ref object of Transport @@ -354,9 +367,24 @@ proc new*( upgrade: Upgrade, connectionsTimeout = 10.minutes): T {.public.} = + let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers) + upgrader.streamHandler = proc(conn: Connection) + {.async, gcsafe, raises: [].} = + # TODO: replace echo by trace and find why it fails compiling + echo "Starting stream handler"#, conn + try: + await upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + raise exc + except CatchableError as exc: + echo "exception in stream handler", exc.msg#, conn, msg = exc.msg + finally: + await conn.closeWithEOF() + echo "Stream handler done"#, conn + let transport = T( - upgrader: WebRtcUpgrade(secureManagers: upgrade.secureManagers), + upgrader: upgrader, connectionsTimeout: connectionsTimeout) return transport diff --git a/testwebrtc.nim b/testwebrtc.nim index efedbd56ad..461a910ac4 100644 --- a/testwebrtc.nim +++ b/testwebrtc.nim @@ -6,10 +6,13 @@ proc echoHandler(conn: Connection, proto: string) {.async.} = while true: try: echo "\e[35;1m => Echo Handler <=\e[0m" - let msg = string.fromBytes(await conn.readLp(1024)) + var xx = newSeq[byte](1024) + let aa = await conn.readOnce(addr xx[0], 1024) + xx = xx[0.. Echo Handler Receive: ", msg, " <=" echo " => Echo Handler Try Send: ", msg & "1", " <=" - await conn.writeLp(msg & "1") + await conn.write(msg & "1") except CatchableError as e: echo " => Echo Handler Error: ", e.msg, " <=" break @@ -20,6 +23,7 @@ proc main {.async.} = .withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary .withRng(crypto.newRng()) .withMplex() + .withYamux() .withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr)) .withNoise() .build() From abd3653d5600aeace099c7a5b05dc1f86d9113bb Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Wed, 6 Mar 2024 16:53:48 +0100 Subject: [PATCH 15/18] update commit --- .pinned | 4 ++-- libp2p/transports/webrtctransport.nim | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.pinned b/.pinned index 62ff9bb795..9b12a0a956 100644 --- a/.pinned +++ b/.pinned @@ -6,7 +6,7 @@ dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153 faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309 httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df -mbedtls;https://github.com/status-im/nim-mbedtls.git@#1167c90f1e5c1f4872868720140ff5b0840d18be +mbedtls;https://github.com/status-im/nim-mbedtls.git@#308f3edaa0edcc880b54ce22156fb2f4e2a2bcc7 metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff nimcrypto;https://github.com/cheatfate/nimcrypto@#a079df92424968d46a6ac258299ce9380aa153f2 results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#9a6657922ace51a93feb57520b12d9ab5a550a13 +webrtc;https://github.com/status-im/nim-webrtc.git@#d525da3d62ed65e989d782e4cbb7edf221128568 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 19d09df4dc..ae6564b4c5 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -285,6 +285,7 @@ method close*(m: WebRtcMuxer) {.async, gcsafe.} = await m.webRtcConn.close() # -- Upgrader -- + type WebRtcStreamHandler = proc(conn: Connection): Future[void] {.gcsafe, raises: [].} WebRtcUpgrade = ref object of Upgrade From c327762f476244fc66fbd9197072b311766f99b2 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 22 Mar 2024 14:41:31 +0100 Subject: [PATCH 16/18] Add comments & remove TODO already done --- libp2p/transports/webrtctransport.nim | 44 +++++++++++++++++++-------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index ae6564b4c5..5932f7baa7 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -39,10 +39,21 @@ export transport, results const WebRtcTransportTrackerName* = "libp2p.webrtctransport" + MaxMessageSize = 16384 # 16KiB; from the WebRtc-direct spec # -- Message -- +# Implementation of the libp2p's WebRTC message defined here: +# https://github.com/libp2p/specs/blob/master/webrtc/README.md?plain=1#L60-L79 + type MessageFlag = enum + ## Flags to support half-closing and reset of streams. + ## - Fin: Sender will no longer send messages + ## - StopSending: Sender will no longer read messages. + ## Received messages are discarded + ## - ResetStream: Sender abruptly terminates the sending part of the stream. + ## Receiver MAY discard any data that it already received on that stream + ## - FinAck: Acknowledges the previous receipt of a message with the Fin flag set. Fin = 0 StopSending = 1 ResetStream = 2 @@ -53,6 +64,7 @@ type data: seq[byte] proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] = + ## Decoding WebRTC Message from raw data var pb = initProtoBuffer(bytes) flagOrd: uint32 @@ -66,6 +78,7 @@ proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] = Opt.some(res) proc encode(msg: WebRtcMessage): seq[byte] = + ## Encoding WebRTC Message to raw data var pb = initProtoBuffer() msg.flag.withValue(val): @@ -78,6 +91,11 @@ proc encode(msg: WebRtcMessage): seq[byte] = pb.buffer # -- Raw WebRTC Stream -- +# All the data written to or read from a WebRtcStream should be length-prefixed +# so `readOnce`/`write` WebRtcStream implementation must either recode +# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can +# directly use `readLP` and `writeLP`. The second solution is the less redundant, +# so it's the one we've chosen. type RawWebRtcStream = ref object of Connection @@ -85,8 +103,7 @@ type readData: seq[byte] proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream = - let stream = RawWebRtcStream(dataChannel: dataChannel) - stream + RawWebRtcStream(dataChannel: dataChannel) method closeImpl*(s: RawWebRtcStream): Future[void] = # TODO: close datachannel @@ -111,7 +128,6 @@ method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] s.readData = s.readData[result..^1] # -- Stream -- -const MaxMessageSize = 16384 # 16KiB type WebRtcState = enum @@ -153,7 +169,6 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = # We need to make sure we send all of our data before another write # Otherwise, two concurrent writes could get intertwined # We avoid this by filling the s.sendQueue synchronously - var msg = msg2 trace "WebrtcStream write", msg, len=msg.len() let retFuture = newFuture[void]("WebRtcStream.write") @@ -224,8 +239,10 @@ method closeImpl*(s: WebRtcStream) {.async.} = discard await s.readOnce(nil, 0) # -- Connection -- + type WebRtcConnection = ref object of Connection connection: DataChannelConnection + remoteAddress: MultiAddress method close*(conn: WebRtcConnection) {.async.} = #TODO @@ -248,11 +265,11 @@ proc getStream*(conn: WebRtcConnection, of Direction.In: await conn.connection.accept() of Direction.Out: - #TODO don't hardcode stream id (should be in nim-webrtc) await conn.connection.openStream(noiseHandshake) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) # -- Muxer -- + type WebRtcMuxer = ref object of Muxer webRtcConn: WebRtcConnection handleFut: Future[void] @@ -311,7 +328,7 @@ method upgrade*( echo "=> ", ((Noise)noiseHandler[0]).commonPrologue let - stream = await webRtcConn.getStream(Out, true) #TODO add channelId: 0 + stream = await webRtcConn.getStream(Out, true) secureStream = await noiseHandler[0].handshake( stream, initiator = true, # we are always the initiator in webrtc-direct @@ -326,6 +343,7 @@ method upgrade*( result.handler = result.handle() # -- Transport -- + type WebRtcTransport* = ref object of Transport connectionsTimeout: Duration @@ -426,14 +444,14 @@ method start*( trace "Listening on", address = self.addrs[i] -proc connHandler(self: WebRtcTransport, - client: DataChannelConnection, - observedAddr: Opt[MultiAddress], - dir: Direction): Future[Connection] {.async.} = - +proc connHandler( + self: WebRtcTransport, + client: DataChannelConnection, + observedAddr: Opt[MultiAddress], + dir: Direction + ): Future[Connection] {.async.} = trace "Handling webrtc connection", address = $observedAddr, dir = $dir, - clients = self.clients[Direction.In].len + - self.clients[Direction.Out].len + clients = self.clients[Direction.In].len + self.clients[Direction.Out].len let conn: Connection = WebRtcConnection.new( From 8b9f34959bccce13302f404c3927e164bdc414be Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Wed, 3 Apr 2024 16:36:06 +0200 Subject: [PATCH 17/18] remove trailing space --- libp2p/transports/webrtctransport.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 5932f7baa7..8f3a76b379 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -93,7 +93,7 @@ proc encode(msg: WebRtcMessage): seq[byte] = # -- Raw WebRTC Stream -- # All the data written to or read from a WebRtcStream should be length-prefixed # so `readOnce`/`write` WebRtcStream implementation must either recode -# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can +# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can # directly use `readLP` and `writeLP`. The second solution is the less redundant, # so it's the one we've chosen. From 9f90721d1246b2c8063ffa1b7d3c4ff22febfd10 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Mon, 10 Jun 2024 15:57:40 +0200 Subject: [PATCH 18/18] feat: add proc genUfrag to generate a random username string --- libp2p/transports/webrtctransport.nim | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 8f3a76b379..b26be1c974 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -37,6 +37,14 @@ logScope: export transport, results +const charset = toSeq("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789/+".items) +proc genUfrag*(rng: ref HmacDrbgContext, size: int): seq[byte] = + # https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md?plain=1#L73-L77 + result = newSeq[byte](size) + for resultIndex in 0..