Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligent anti-bufferboat in picomux #52

Merged
merged 8 commits into from
Feb 13, 2025
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libraries/picomux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -48,3 +48,4 @@ socksv5 = "0.3"
sillad-sosistab3 = { path = "../sillad-sosistab3" }
tracing-subscriber = "0.3"
clap = { version = "4.5.8", features = ["derive"] }
argh = "0.1"
99 changes: 99 additions & 0 deletions libraries/picomux/examples/picomux-test/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use futures_lite::{AsyncWriteExt, FutureExt as _};
use futures_util::AsyncReadExt;
use picomux::{LivenessConfig, PicoMux};
use rand::RngCore;
use sillad::dialer::{Dialer, DialerExt};

use crate::command::Command;

pub async fn client_main(connect: SocketAddr, sosistab3: Option<String>) -> anyhow::Result<()> {
let start = Instant::now();
let mut wire = if let Some(sosistab3) = sosistab3 {
sillad_sosistab3::dialer::SosistabDialer {
inner: sillad::tcp::TcpDialer { dest_addr: connect },
cookie: sillad_sosistab3::Cookie::new(&sosistab3),
}
.dynamic()
} else {
sillad::tcp::TcpDialer { dest_addr: connect }.dynamic()
}
.dial()
.await?;
eprintln!("wire dialed in {:?}", start.elapsed());

// loop {
// let mut buf = b"aaaaaaaaaaaaaaaaaaaaaaa".to_vec();

// wire.write_all(&buf).await?;
// }

let (read, write) = wire.split();
let mut mux = PicoMux::new(read, write);
mux.set_liveness(LivenessConfig {
ping_interval: Duration::from_secs(1),
timeout: Duration::from_secs(1000),
});
let mux = Arc::new(mux);

let start_ping = ping_once(mux.clone()).await?;
eprintln!("unloaded ping: {:?}", start_ping);
loop {
let ping_loop = async {
loop {
smol::Timer::after(Duration::from_secs(3)).await;
let ping = ping_once(mux.clone()).await?;
eprintln!(
"loaded ping: {:?}; bloat {:?}",
ping,
ping.saturating_sub(start_ping)
);
}
};
ping_loop.race(download_chunk(mux.clone())).await?;
}
}

async fn ping_once(mux: Arc<PicoMux>) -> anyhow::Result<Duration> {
let start = Instant::now();
const COUNT: u32 = 1;
for _ in 0..COUNT {
let stream = mux.open(&serde_json::to_vec(&Command::Source(1))?).await?;
futures_util::io::copy(stream, &mut futures_util::io::sink()).await?;
}
Ok(start.elapsed() / COUNT)
}

async fn download_chunk(mux: Arc<PicoMux>) -> anyhow::Result<()> {
const CHUNK_SIZE: usize = 1024 * 1024 * 1000;
eprintln!("**** starting chunk download, size {CHUNK_SIZE} ****");
let start = Instant::now();
let mut stream = mux
.open(&serde_json::to_vec(&Command::Source(CHUNK_SIZE))?)
.await?;
loop {
let start = Instant::now();
let n = futures_util::io::copy(
(&mut stream).take(10_000_000),
&mut futures_util::io::sink(),
)
.await?;
if n == 0 {
break;
}
eprintln!(
"*** current {:.2} Mbps",
80.0 / start.elapsed().as_secs_f64()
)
}
eprintln!(
"**** chunk download; total speed {:.2} Mbps ****",
(CHUNK_SIZE as f64 / start.elapsed().as_secs_f64()) / 1000.0 / 1000.0 * 8.0
);
Ok(())
}
7 changes: 7 additions & 0 deletions libraries/picomux/examples/picomux-test/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Command {
Source(usize),
Sink(usize),
}
60 changes: 60 additions & 0 deletions libraries/picomux/examples/picomux-test/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod client;
mod command;
mod server;

use std::net::SocketAddr;

use argh::FromArgs;
use client::client_main;
use server::server_main;

/// picomux-test: A tool with server and client subcommands.
#[derive(FromArgs)]
struct PicomuxTest {
#[argh(subcommand)]
subcommand: Subcommand,
}

#[derive(FromArgs)]
#[argh(subcommand)]
enum Subcommand {
/// start the server
Server(ServerCmd),

/// start the client
Client(ClientCmd),
}

/// Start the server with a listening address.
#[derive(FromArgs)]
#[argh(subcommand, name = "server")]
struct ServerCmd {
/// address to listen on (e.g., 127.0.0.1:8080)
#[argh(option, long = "listen")]
listen: SocketAddr,
/// sosistab3 cookie for obfuscation
#[argh(option, long = "sosistab3")]
sosistab3: Option<String>,
}

/// Start the client with a connection address.
#[derive(FromArgs)]
#[argh(subcommand, name = "client")]
struct ClientCmd {
/// address to connect to (e.g., 127.0.0.1:8080)
#[argh(option, long = "connect")]
connect: SocketAddr,
/// sosistab3 cookie for obfuscation
#[argh(option, long = "sosistab3")]
sosistab3: Option<String>,
}

fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let picomux_test: PicomuxTest = argh::from_env();

match picomux_test.subcommand {
Subcommand::Server(cmd) => smolscale::block_on(server_main(cmd.listen, cmd.sosistab3)),
Subcommand::Client(cmd) => smolscale::block_on(client_main(cmd.connect, cmd.sosistab3)),
}
}
75 changes: 75 additions & 0 deletions libraries/picomux/examples/picomux-test/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::{net::SocketAddr, sync::atomic::AtomicU64};

use futures_util::{AsyncReadExt, AsyncWriteExt, TryFutureExt};
use picomux::{PicoMux, Stream};
use rand::RngCore;

use sillad::{listener::Listener, Pipe};

use crate::command::Command;

pub async fn server_main(listen: SocketAddr, sosistab3: Option<String>) -> anyhow::Result<()> {
if let Some(sosistab3) = sosistab3 {
let mut listener = sillad_sosistab3::listener::SosistabListener::new(
sillad::tcp::TcpListener::bind(listen).await?,
sillad_sosistab3::Cookie::new(&sosistab3),
);
loop {
let wire = listener.accept().await?;
smolscale::spawn(once_wire(wire).inspect_err(|err| eprintln!("wire died: {:?}", err)))
.detach();
}
} else {
let mut listener = sillad::tcp::TcpListener::bind(listen).await?;
loop {
let wire = listener.accept().await?;
smolscale::spawn(once_wire(wire).inspect_err(|err| eprintln!("wire died: {:?}", err)))
.detach();
}
}
}

async fn once_wire(mut wire: impl Pipe) -> anyhow::Result<()> {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let wire_count = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
eprintln!("accepted wire {wire_count} from {:?}", wire.remote_addr());
// loop {
// let mut buf = [0u8; 1024];
// wire.read_exact(&mut buf).await?;
// eprintln!("gotten 1024 garbages");
// }

let (read_wire, write_wire) = wire.split();
let mux = PicoMux::new(read_wire, write_wire);
for stream_count in 0u64.. {
let stream = mux.accept().await?;
eprintln!("accepted stream {stream_count} from wire {wire_count}");
smolscale::spawn(
once_stream(wire_count, stream_count, stream).inspect_err(move |err| {
eprintln!("stream {wire_count}/{stream_count} died: {:?}", err)
}),
)
.detach();
}
unreachable!()
}

async fn once_stream(wire_count: u64, stream_count: u64, mut stream: Stream) -> anyhow::Result<()> {
let command: Command = serde_json::from_slice(stream.metadata())?;
eprintln!("{wire_count}/{stream_count} command {:?}", command);
match command {
Command::Source(mut len) => {
while len > 0 {
let n = len.min(65536);
let mut buff = vec![0u8; n];
rand::thread_rng().fill_bytes(&mut buff);
stream.write_all(&buff).await?;
len = len.saturating_sub(n);
}
}
Command::Sink(sink) => {
futures_util::io::copy(stream.take(sink as _), &mut futures_util::io::sink()).await?;
}
}
anyhow::Ok(())
}
32 changes: 32 additions & 0 deletions libraries/picomux/src/bdp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::time::Instant;

/// A properly antialiased calculator of the bandwidth.
pub struct BwEstimate {
accum: f64,
last_update: Instant,
}

impl BwEstimate {
pub fn new() -> Self {
Self {
accum: 0.0,
last_update: Instant::now(),
}
}

pub fn sample(&mut self, delta: usize) {
let now = Instant::now();
let delta_t = now
.saturating_duration_since(self.last_update)
.as_secs_f64();
let speed_sample = delta as f64 / delta_t;
let decay_ratio = 1.0 / 2.0f64.powf(delta_t); // decay exponentially
self.accum = self.accum * decay_ratio + speed_sample * (1.0 - decay_ratio);
self.last_update = now;
tracing::debug!("speed now {:.2} MB/s", self.accum / 1_000_000.0);
}

pub fn read(&self) -> f64 {
self.accum
}
}
9 changes: 9 additions & 0 deletions libraries/picomux/src/buffer_table.rs
Original file line number Diff line number Diff line change
@@ -67,6 +67,8 @@ impl BufferTable {
/// Waits until the send window for the given stream is at least 1, then decrement it by 1.
pub async fn wait_send_window(&self, stream_id: u32) {
let semaph = if let Some(inner) = self.inner.get(&stream_id) {
let before = inner.1.permits();
tracing::debug!(stream_id, before, "decrementing send window");
inner.1.clone()
} else {
futures_util::future::pending().await
@@ -77,6 +79,13 @@ impl BufferTable {
/// Increases the send window for the given stream.
pub fn incr_send_window(&self, stream_id: u32, amount: u16) {
if let Some(inner) = self.inner.get(&stream_id) {
let before = inner.1.permits();
tracing::debug!(
stream_id,
before,
after = display(amount as usize + before),
"increasing send window"
);
inner.1.release(amount as _);
}
}
51 changes: 31 additions & 20 deletions libraries/picomux/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod bdp;
mod buffer_table;
mod frame;

mod outgoing;

use std::{
@@ -21,6 +21,7 @@ use anyhow::Context;

use async_task::Task;

use bdp::BwEstimate;
use buffer_table::BufferTable;
use bytes::Bytes;
use frame::{Frame, CMD_FIN, CMD_MORE, CMD_NOP, CMD_PING, CMD_PONG, CMD_PSH, CMD_SYN};
@@ -42,7 +43,7 @@ use tap::Tap;
use crate::frame::{Header, PingInfo};

const INIT_WINDOW: usize = 10;
const MAX_WINDOW: usize = 500;
const MAX_WINDOW: usize = 1500;
const MSS: usize = 8192;

#[derive(Clone, Copy, Debug)]
@@ -214,8 +215,9 @@ async fn picomux_inner(
let mut remote_window = INIT_WINDOW;
let mut target_remote_window = MAX_WINDOW;
let mut last_window_adjust = Instant::now();
let mut bw_estimate = BwEstimate::new();
loop {
let min_quantum = (target_remote_window / 10).clamp(3, 50);
let min_quantum = (target_remote_window / 10).clamp(1, 500);
let frame = buffer_recv.recv().await;
if frame.header.command == CMD_FIN {
anyhow::bail!("received remote FIN");
@@ -228,28 +230,37 @@ async fn picomux_inner(
target_remote_window,
"queue delay measured"
);
bw_estimate.sample(frame.body.len());
write_incoming
.write_all(&frame.body)
.await
.context("could not write to incoming")?;
remote_window -= 1;

// adjust the target remote window once per window
if last_window_adjust.elapsed().as_millis() > 250 {
last_window_adjust = Instant::now();
if queue_delay.as_millis() > 50 {
target_remote_window = (target_remote_window / 2).max(3);
} else {
target_remote_window = (target_remote_window + 1).min(MAX_WINDOW);
}
tracing::debug!(
stream_id,
queue_delay = debug(queue_delay),
remote_window,
target_remote_window,
"adjusting window"
)
}
// assume the delay is 500ms
target_remote_window = ((bw_estimate.read() / MSS as f64 / 2.0) as usize)
.clamp(INIT_WINDOW, MAX_WINDOW);
tracing::debug!(
target_remote_window,
"setting target remote send window based on bw"
);

// // adjust the target remote window once per window
// if last_window_adjust.elapsed().as_millis() > 250 {
// last_window_adjust = Instant::now();
// if queue_delay.as_millis() > 50 {
// target_remote_window = (target_remote_window / 2).max(3);
// } else {
// target_remote_window = (target_remote_window + 1).min(MAX_WINDOW);
// }
// tracing::debug!(
// stream_id,
// queue_delay = debug(queue_delay),
// remote_window,
// target_remote_window,
// "adjusting window"
// )
// }

if remote_window + min_quantum <= target_remote_window {
let quantum = target_remote_window - remote_window;
@@ -402,7 +413,7 @@ async fn picomux_inner(
loop {
let frame = Frame::read(&mut inner_read).await?;
let stream_id = frame.header.stream_id;
tracing::debug!(
tracing::trace!(
command = frame.header.command,
stream_id,
body_len = frame.header.body_len,
4 changes: 2 additions & 2 deletions libraries/picomux/src/outgoing.rs
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ impl Outgoing {
if let Some(err) = self.err.get() {
return Some(Err(anyhow::anyhow!("{:?}", err)));
}
if self.inner.queue.is_empty() {
if self.inner.queue.len() < 10 {
Some(anyhow::Ok(()))
} else {
None
@@ -51,7 +51,7 @@ impl Outgoing {

/// Infallibly, non-blockingly enqueues a frame to be sent to the outgoing writer.
pub fn enqueue(&self, outgoing: Frame) {
tracing::debug!(
tracing::trace!(
command = outgoing.header.command,
stream_id = outgoing.header.stream_id,
body_len = outgoing.header.body_len,
3 changes: 2 additions & 1 deletion libraries/sillad-sosistab3/src/dialer.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ impl<D: Dialer> Dialer for SosistabDialer<D> {
let eph_sk = x25519_dalek::EphemeralSecret::random_from_rng(rand::thread_rng());
let eph_pk: x25519_dalek::PublicKey = (&eph_sk).into();
// we generate a whole lot of random padding
let padding_len: u64 = rand::thread_rng().gen_range(0..=8192);
let padding_len: u64 = rand::thread_rng().gen_range(0..=1024);
let padding = vec![0; padding_len as usize].tap_mut(|v| rand::thread_rng().fill_bytes(v));
let padding_hash = blake3::hash(&padding);
// generate the handshake
@@ -81,6 +81,7 @@ impl<D: Dialer> Dialer for SosistabDialer<D> {
cookie = debug(self.cookie),
padding_len,
padding_hash = debug(padding_hash),
their_padding_len = their_handshake.padding_len,
"their handshake received"
);
// we are ready for the shared secret
2 changes: 1 addition & 1 deletion libraries/sillad-sosistab3/src/listener.rs
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ async fn listen_loop<P: Pipe>(
x25519_dalek::EphemeralSecret::random_from_rng(rand::thread_rng());
let eph_pk: x25519_dalek::PublicKey = (&eph_sk).into();
// we generate a whole lot of random padding
let padding_len: u64 = rand::thread_rng().gen_range(0..=8192);
let padding_len: u64 = rand::thread_rng().gen_range(0..=1024);
let padding = vec![0; padding_len as usize]
.tap_mut(|v| rand::thread_rng().fill_bytes(v));
let padding_hash = blake3::hash(&padding);
30 changes: 15 additions & 15 deletions libraries/sillad/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -76,21 +76,21 @@ impl Listener for TcpListener {
fn set_tcp_options(conn: &Async<TcpStream>) -> std::io::Result<()> {
conn.get_ref().set_nodelay(true)?;

#[cfg(any(target_os = "linux", target_os = "android"))]
unsafe {
use std::os::fd::AsRawFd;
let lowat: libc::c_int = 32768;
let ret = libc::setsockopt(
conn.as_raw_fd(),
libc::IPPROTO_TCP,
libc::TCP_NOTSENT_LOWAT,
&lowat as *const _ as *const libc::c_void,
std::mem::size_of_val(&lowat) as libc::socklen_t,
);
if ret != 0 {
return Err(std::io::Error::last_os_error());
}
}
// #[cfg(any(target_os = "linux", target_os = "android"))]
// unsafe {
// use std::os::fd::AsRawFd;
// let lowat: libc::c_int = 32768;
// let ret = libc::setsockopt(
// conn.as_raw_fd(),
// libc::IPPROTO_TCP,
// libc::TCP_NOTSENT_LOWAT,
// &lowat as *const _ as *const libc::c_void,
// std::mem::size_of_val(&lowat) as libc::socklen_t,
// );
// if ret != 0 {
// return Err(std::io::Error::last_os_error());
// }
// }
Ok(())
}