Skip to content

Commit

Permalink
make windowsize configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut committed Nov 17, 2023
1 parent 60f9536 commit 4a8446f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
4 changes: 2 additions & 2 deletions libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ proc withMplex*(
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
b

proc withYamux*(b: SwitchBuilder): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn)
proc withYamux*(b: SwitchBuilder, windowSize: int = DefaultWindowSize): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize)

assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
Expand Down
27 changes: 17 additions & 10 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ logScope:
const
YamuxCodec* = "/yamux/1.0.0"
YamuxVersion = 0.uint8
DefaultWindowSize = 256000
DefaultWindowSize* = 256000
MaxChannelCount = 200

when defined(libp2p_yamux_metrics):
Expand Down Expand Up @@ -351,7 +351,10 @@ proc open*(channel: YamuxChannel) {.async, gcsafe.} =
trace "Try to open channel twice"
return
channel.opened = true
await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack}))
await channel.conn.write(YamuxHeader.windowUpdate(
channel.id,
channel.maxRecvWindow.uint32 - DefaultWindowSize,
{if channel.isSrc: Syn else: Ack}))

method getWrapped*(channel: YamuxChannel): Connection = channel.conn

Expand All @@ -362,6 +365,7 @@ type
currentId: uint32
isClosed: bool
maxChannCount: int
windowSize: int

proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
Expand All @@ -375,12 +379,12 @@ proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
if channel.isReset and channel.recvWindow > 0:
m.flushed[channel.id] = channel.recvWindow

proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
proc createStream(m: Yamux, id: uint32, isSrc: bool, windowSize: int): YamuxChannel =
result = YamuxChannel(
id: id,
maxRecvWindow: DefaultWindowSize,
recvWindow: DefaultWindowSize,
sendWindow: DefaultWindowSize,
maxRecvWindow: windowSize,
recvWindow: windowSize,
sendWindow: windowSize,
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),
Expand Down Expand Up @@ -455,7 +459,7 @@ method handle*(m: Yamux) {.async, gcsafe.} =
m.flushed.del(header.streamId)
if header.streamId mod 2 == m.currentId mod 2:
raise newException(YamuxError, "Peer used our reserved stream id")
let newStream = m.createStream(header.streamId, false)
let newStream = m.createStream(header.streamId, false, m.windowSize)
if m.channels.len >= m.maxChannCount:
await newStream.reset()
continue
Expand Down Expand Up @@ -515,15 +519,18 @@ method newStream*(

if m.channels.len > m.maxChannCount - 1:
raise newException(TooManyChannels, "max allowed channel count exceeded")
let stream = m.createStream(m.currentId, true)
let stream = m.createStream(m.currentId, true, m.windowSize)
m.currentId += 2
if not lazy:
await stream.open()
return stream

proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T =
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = DefaultWindowSize): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,
maxChannCount: maxChannCount
maxChannCount: maxChannCount,
windowSize: if windowSize > DefaultWindowSize: windowSize else: DefaultWindowSize
)

0 comments on commit 4a8446f

Please sign in to comment.