From 4fc1bea5a79e73398ac27e081d1912bee0d0f40d Mon Sep 17 00:00:00 2001 From: Erin Power Date: Tue, 22 Oct 2024 12:04:51 +0200 Subject: [PATCH] perf: use rayon for io-uring loop --- Cargo.lock | 40 ++++ Cargo.toml | 1 + src/components/proxy/io_uring_shared.rs | 270 ++++++++++++------------ 3 files changed, 173 insertions(+), 138 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee5554877..a8eba7635 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,6 +491,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2380,6 +2399,7 @@ dependencies = [ "quilkin-proto", "quilkin-xds", "rand", + "rayon", "regex", "schemars", "seahash", @@ -2501,6 +2521,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index e1d702c46..8902bef26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,7 @@ cfg-if = "1.0.0" libflate = "2.0.0" form_urlencoded = "1.2.1" gxhash = "3.4.1" +rayon = "1.10.0" [dependencies.hyper-util] version = "0.1" diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index b9b28eb44..1d1231f01 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -526,7 +526,7 @@ impl IoUringLoop { pub fn spawn( self, - thread_name: String, + _thread_name: String, mut ctx: PacketProcessorCtx, receiver: PacketReceiver, buffer_pool: Arc, @@ -551,156 +551,150 @@ impl IoUringLoop { // Used to notify the uring loop to shutdown let mut shutdown_event = EventFd::new()?; - std::thread::Builder::new() - .name(thread_name) - .spawn(move || { - let _guard = tracing::dispatcher::set_default(&dispatcher); - - let tokens = slab::Slab::with_capacity(concurrent_sends + 1 + 1 + 1); - let loop_packets = slab::Slab::with_capacity(concurrent_sends + 1); - - // Create an eventfd to notify the uring thread (this one) of - // pending sends - let pending_sends = PendingSends::new(pending_sends_event.writer()); - // Just double buffer the pending writes for simplicity - let mut double_pending_sends = Vec::new(); - - // When sending packets, this is the direction used when updating metrics - let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { - metrics::WRITE - } else { - metrics::READ - }; - - // Spawn the worker tasks that process in an async context unlike - // our io-uring loop below - spawn_workers( - &rt, - receiver, - pending_sends.clone(), - shutdown, - shutdown_event.writer(), - ); - - let (submitter, sq, mut cq) = ring.split(); - - let mut loop_ctx = LoopCtx { - sq, - socket_fd: socket.raw_fd(), - backlog: Default::default(), - loop_packets, - tokens, - }; - - loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); - loop_ctx - .push_with_token(pending_sends_event.io_uring_entry(), Token::PendingsSends); - loop_ctx.push_with_token(shutdown_event.io_uring_entry(), Token::Shutdown); - - // Sync always needs to be called when entries have been pushed - // onto the submission queue for the loop to actually function (ie, similar to await on futures) - loop_ctx.sync(); + rayon::spawn(move || { + let _guard = tracing::dispatcher::set_default(&dispatcher); - // Notify that we have set everything up - let _ = tx.send(()); - let mut last_received_at = None; - let process_event_writer = process_event.writer(); - - // The core io uring loop - 'io: loop { - match submitter.submit_and_wait(1) { - Ok(_) => {} - Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => {} - Err(ref err) if err.raw_os_error() == Some(libc::EINTR) => { - continue; - } - Err(error) => { - tracing::error!(%error, "io-uring submit_and_wait failed"); - return; - } - } - cq.sync(); + let tokens = slab::Slab::with_capacity(concurrent_sends + 1 + 1 + 1); + let loop_packets = slab::Slab::with_capacity(concurrent_sends + 1); - if let Err(error) = loop_ctx.process_backlog(&submitter) { - tracing::error!(%error, "failed to process io-uring backlog"); + // Create an eventfd to notify the uring thread (this one) of + // pending sends + let pending_sends = PendingSends::new(pending_sends_event.writer()); + // Just double buffer the pending writes for simplicity + let mut double_pending_sends = Vec::new(); + + // When sending packets, this is the direction used when updating metrics + let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { + metrics::WRITE + } else { + metrics::READ + }; + + // Spawn the worker tasks that process in an async context unlike + // our io-uring loop below + spawn_workers( + &rt, + receiver, + pending_sends.clone(), + shutdown, + shutdown_event.writer(), + ); + + let (submitter, sq, mut cq) = ring.split(); + + let mut loop_ctx = LoopCtx { + sq, + socket_fd: socket.raw_fd(), + backlog: Default::default(), + loop_packets, + tokens, + }; + + loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); + loop_ctx.push_with_token(pending_sends_event.io_uring_entry(), Token::PendingsSends); + loop_ctx.push_with_token(shutdown_event.io_uring_entry(), Token::Shutdown); + + // Sync always needs to be called when entries have been pushed + // onto the submission queue for the loop to actually function (ie, similar to await on futures) + loop_ctx.sync(); + + // Notify that we have set everything up + let _ = tx.send(()); + let mut last_received_at = None; + let process_event_writer = process_event.writer(); + + // The core io uring loop + 'io: loop { + match submitter.submit_and_wait(1) { + Ok(_) => {} + Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => {} + Err(ref err) if err.raw_os_error() == Some(libc::EINTR) => { + continue; + } + Err(error) => { + tracing::error!(%error, "io-uring submit_and_wait failed"); return; } + } + cq.sync(); - // Now actually process all of the completed io requests - for cqe in &mut cq { - let ret = cqe.result(); - let token_index = cqe.user_data() as usize; - - let token = loop_ctx.remove(token_index); - match token { - Token::Recv { key } => { - // Pop the packet regardless of whether we failed or not so that - // we don't consume a buffer slot forever - let packet = loop_ctx.pop_packet(key); - - if ret < 0 { - let error = std::io::Error::from_raw_os_error(-ret); - tracing::error!(%error, "error receiving packet"); - loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); - continue; - } - - let packet = packet.finalize_recv(ret as usize); - process_packet( - &mut ctx, - &process_event_writer, - packet, - &mut last_received_at, - ); + if let Err(error) = loop_ctx.process_backlog(&submitter) { + tracing::error!(%error, "failed to process io-uring backlog"); + return; + } + // Now actually process all of the completed io requests + for cqe in &mut cq { + let ret = cqe.result(); + let token_index = cqe.user_data() as usize; + + let token = loop_ctx.remove(token_index); + match token { + Token::Recv { key } => { + // Pop the packet regardless of whether we failed or not so that + // we don't consume a buffer slot forever + let packet = loop_ctx.pop_packet(key); + + if ret < 0 { + let error = std::io::Error::from_raw_os_error(-ret); + tracing::error!(%error, "error receiving packet"); loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); + continue; } - Token::PendingsSends => { - double_pending_sends = pending_sends.swap(double_pending_sends); - loop_ctx.push_with_token( - pending_sends_event.io_uring_entry(), - Token::PendingsSends, - ); - - for pending in - double_pending_sends.drain(0..double_pending_sends.len()) - { - loop_ctx.enqueue_send(pending); - } - } - Token::Send { key } => { - let packet = loop_ctx.pop_packet(key).finalize_send(); - let asn_info = packet.asn_info.as_ref().into(); - - if ret < 0 { - let source = - std::io::Error::from_raw_os_error(-ret).to_string(); - metrics::errors_total(send_dir, &source, &asn_info).inc(); - metrics::packets_dropped_total(send_dir, &source, &asn_info) - .inc(); - } else if ret as usize != packet.buffer.len() { - metrics::packets_total(send_dir, &asn_info).inc(); - metrics::errors_total( - send_dir, - "sent bytes != packet length", - &asn_info, - ) - .inc(); - } else { - metrics::packets_total(send_dir, &asn_info).inc(); - metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64); - } + + let packet = packet.finalize_recv(ret as usize); + process_packet( + &mut ctx, + &process_event_writer, + packet, + &mut last_received_at, + ); + + loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); + } + Token::PendingsSends => { + double_pending_sends = pending_sends.swap(double_pending_sends); + loop_ctx.push_with_token( + pending_sends_event.io_uring_entry(), + Token::PendingsSends, + ); + + for pending in double_pending_sends.drain(0..double_pending_sends.len()) + { + loop_ctx.enqueue_send(pending); } - Token::Shutdown => { - tracing::info!("io-uring loop shutdown requested"); - break 'io; + } + Token::Send { key } => { + let packet = loop_ctx.pop_packet(key).finalize_send(); + let asn_info = packet.asn_info.as_ref().into(); + + if ret < 0 { + let source = std::io::Error::from_raw_os_error(-ret).to_string(); + metrics::errors_total(send_dir, &source, &asn_info).inc(); + metrics::packets_dropped_total(send_dir, &source, &asn_info).inc(); + } else if ret as usize != packet.buffer.len() { + metrics::packets_total(send_dir, &asn_info).inc(); + metrics::errors_total( + send_dir, + "sent bytes != packet length", + &asn_info, + ) + .inc(); + } else { + metrics::packets_total(send_dir, &asn_info).inc(); + metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64); } } + Token::Shutdown => { + tracing::info!("io-uring loop shutdown requested"); + break 'io; + } } - - loop_ctx.sync(); } - })?; + + loop_ctx.sync(); + } + }); Ok(rx) }