Skip to content

Commit

Permalink
Match a10::Ring setup in testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomasdezeeuw committed Apr 6, 2024
1 parent 358d33f commit bbd6215
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 2 deletions.
30 changes: 30 additions & 0 deletions rt/src/io/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ pub unsafe trait BufMut: 'static {
/// [`TcpStream::recv_n`]: crate::net::TcpStream::recv_n
unsafe fn update_length(&mut self, n: usize);

/// Not part of the stable API.
/// **Do not overwrite, things will go badly**.
#[doc(hidden)]
fn buffer_group(&self) -> Option<a10::io::BufGroupId> {
None
}

/// Not part of the stable API.
/// **Do not overwrite, things will go badly**.
#[doc(hidden)]
unsafe fn buffer_init(&mut self, _: a10::io::BufIdx, n: u32) {
self.update_length(n as usize);
}

/// Extend the buffer with `bytes`, returns the number of bytes copied.
fn extend_from_slice(&mut self, bytes: &[u8]) -> usize {
let (ptr, capacity) = unsafe { self.parts_mut() };
Expand Down Expand Up @@ -652,6 +666,14 @@ unsafe impl<B: BufMut> a10::io::BufMut for BufWrapper<B> {
unsafe fn set_init(&mut self, n: usize) {
self.0.update_length(n);
}

fn buffer_group(&self) -> Option<a10::io::BufGroupId> {
self.0.buffer_group()
}

unsafe fn buffer_init(&mut self, idx: a10::io::BufIdx, n: u32) {
self.0.buffer_init(idx, n);
}
}

unsafe impl<B: BufMut> BufMut for BufWrapper<B> {
Expand All @@ -670,6 +692,14 @@ unsafe impl<B: BufMut> BufMut for BufWrapper<B> {
fn has_spare_capacity(&self) -> bool {
self.0.has_spare_capacity()
}

fn buffer_group(&self) -> Option<a10::io::BufGroupId> {
self.0.buffer_group()
}

unsafe fn buffer_init(&mut self, idx: a10::io::BufIdx, n: u32) {
self.0.buffer_init(idx, n);
}
}

unsafe impl<B: Buf> a10::io::Buf for BufWrapper<B> {
Expand Down
273 changes: 273 additions & 0 deletions rt/src/io/buf_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
//! Module with read buffer pool.
//!
//! See [`ReadBufPool`].
use std::borrow::{Borrow, BorrowMut};
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut, RangeBounds};
use std::{fmt, io};

use crate::io::{Buf, BufMut};
use crate::Access;

use a10::io::{Buf as _, BufMut as _};

/// A read buffer pool.
///
/// This is a special buffer pool that can only be used in read operations done
/// by the kernel, i.e. no in-memory operations. The buffer pool is actually
/// managed by the kernel in `read(2)` and `recv(2)` like calls. Instead of user
/// space having to select a buffer before issueing the read call, the kernel
/// will select a buffer from the pool when it's ready for reading. This avoids
/// the need to have as many buffers as concurrent read calls.
///
/// As a result of this the returned buffer, [`ReadBuf`], is somewhat limited.
/// For example it can't grow beyond the pool's buffer size. However it can be
/// used in write calls like any other buffer.
#[derive(Clone, Debug)]
pub struct ReadBufPool {
inner: a10::io::ReadBufPool,
}

impl ReadBufPool {
/// Create a new buffer pool.
///
/// `pool_size` must be a power of 2, with a maximum of 2^15 (32768).
/// `buf_size` is the maximum capacity of the buffer. Note that buffer can't
/// grow beyond this capacity.
pub fn new<RT>(rt: &RT, pool_size: u16, buf_size: u32) -> io::Result<ReadBufPool>
where
RT: Access,
{
a10::io::ReadBufPool::new(rt.submission_queue(), pool_size, buf_size)
.map(|inner| ReadBufPool { inner })
}

/// Get a buffer reference to this pool.
///
/// This can only be used in kernel read I/O operations, such as
/// [`TcpStream::recv`]. However it won't yet select a buffer to use. This
/// is done by the kernel once it actually has data to write into the
/// buffer. Before it's used in a read call the returned buffer will be
/// empty and can't be resized, it's effecitvely useless before a read call.
///
/// [`TcpStream::recv`]: crate::net::TcpStream::recv
pub fn get(&self) -> ReadBuf {
ReadBuf {
inner: self.inner.get(),
}
}
}

/// Buffer reference from a [`ReadBufPool`].
///
/// Before a read system call, this will be empty and can't be resized. This is
/// really only useful after reading into it.
///
/// # Notes
///
/// Do **not** use the [`BufMut`] implementation of this buffer to write into
/// it, it's a specialised implementation that is invalid use to outside of the
/// Heph crate.
pub struct ReadBuf {
inner: a10::io::ReadBuf,
}

impl ReadBuf {
/// Returns the capacity of the buffer.
pub fn capacity(&self) -> usize {
self.inner.capacity()
}

/// Returns the length of the buffer.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Returns true if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Returns itself as slice.
pub fn as_slice(&self) -> &[u8] {
self
}

/// Returns itself as mutable slice.
pub fn as_mut_slice(&mut self) -> &mut [u8] {
self
}

/// Truncate the buffer to `len` bytes.
///
/// If the buffer is shorter then `len` bytes this does nothing.
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
}

/// Clear the buffer.
///
/// # Notes
///
/// This is not the same as returning the buffer to the buffer pool, for
/// that use [`ReadBuf::release`].
pub fn clear(&mut self) {
self.inner.clear()
}

/// Remove the bytes in `range` from the buffer.
///
/// # Panics
///
/// This will panic if the `range` is invalid.
pub fn remove<R>(&mut self, range: R)
where
R: RangeBounds<usize>,
{
self.inner.remove(range)
}

/// Set the length of the buffer to `new_len`.
///
/// # Safety
///
/// The caller must ensure `new_len` bytes are initialised and that
/// `new_len` is not larger than the buffer's capacity.
pub unsafe fn set_len(&mut self, new_len: usize) {
self.inner.set_len(new_len)
}

/// Appends `other` to `self`.
///
/// If `self` doesn't have sufficient capacity it will return `Err(())` and
/// will not append anything.
#[allow(clippy::result_unit_err)]
pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), ()> {
self.inner.extend_from_slice(other)
}

/// Returns the remaining spare capacity of the buffer.
pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
self.inner.spare_capacity_mut()
}

/// Release the buffer back to the buffer pool.
///
/// If `self` isn't an allocated buffer this does nothing.
///
/// The buffer can still be used in a `read(2)` system call, it's reset to
/// the state as if it was just created by calling [`ReadBufPool::get`].
///
/// # Notes
///
/// This is automatically called in the `Drop` implementation.
pub fn release(&mut self) {
self.inner.release()
}
}

/// The implementation for `ReadBuf` is a special one as we don't actually pass
/// a "real" buffer. Instead we pass special flags to the kernel that allows it
/// to select a buffer from the connected [`ReadBufPool`] once the actual read
/// operation starts.
///
/// If the `ReadBuf` is used a second time in a read call this changes as at
/// that point it owns an actual buffer. At that point it will behave more like
/// the `Vec<u8>` implementation is that it only uses the unused capacity, so
/// any bytes already in the buffer will be untouched.
///
/// To revert to the original behaviour of allowing the kernel to select a
/// buffer call [`ReadBuf::release`] first.
///
/// Note that this can **not** be used in vectored I/O as a part of the
/// [`BufMutSlice`] trait.
///
/// [`BufMutSlice`]: crate::io::BufMutSlice
unsafe impl BufMut for ReadBuf {
unsafe fn parts_mut(&mut self) -> (*mut u8, usize) {
let (ptr, len) = self.inner.parts_mut();
(ptr, len as usize)
}

unsafe fn update_length(&mut self, n: usize) {
self.set_len(n);
}

fn buffer_group(&self) -> Option<a10::io::BufGroupId> {
self.inner.buffer_group()
}

unsafe fn buffer_init(&mut self, idx: a10::io::BufIdx, n: u32) {
self.inner.buffer_init(idx, n);
}

fn spare_capacity(&self) -> usize {
self.inner.capacity() - self.inner.len()
}

fn has_spare_capacity(&self) -> bool {
self.inner.capacity() >= self.inner.len()
}
}

// SAFETY: `ReadBuf` manages the allocation of the bytes once it's assigned a
// buffer, so as long as it's alive, so is the slice of bytes.
unsafe impl Buf for ReadBuf {
unsafe fn parts(&self) -> (*const u8, usize) {
let (ptr, len) = self.inner.parts();
(ptr, len as usize)
}

fn as_slice(&self) -> &[u8] {
self.inner.as_slice()
}

fn len(&self) -> usize {
self.inner.len()
}
}

impl Deref for ReadBuf {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}

impl DerefMut for ReadBuf {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.deref_mut()
}
}

impl AsRef<[u8]> for ReadBuf {
fn as_ref(&self) -> &[u8] {
self
}
}

impl AsMut<[u8]> for ReadBuf {
fn as_mut(&mut self) -> &mut [u8] {
self
}
}

impl Borrow<[u8]> for ReadBuf {
fn borrow(&self) -> &[u8] {
self
}
}

impl BorrowMut<[u8]> for ReadBuf {
fn borrow_mut(&mut self) -> &mut [u8] {
self
}
}

impl fmt::Debug for ReadBuf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
6 changes: 6 additions & 0 deletions rt/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
//! [`parts_mut`]: BufMut::parts_mut
//! [`update_length`]: BufMut::update_length
//!
//! [`ReadBufPool`] is a specialised read buffer pool that can only be used in
//! read operations done by the kernel, i.e. no in-memory operations.
//!
//! # Working with Standard I/O Streams
//!
//! The [`stdin`], [`stdout`] and [`stderr`] function provide handles to
Expand All @@ -56,6 +59,9 @@ mod buf;
pub(crate) use buf::BufWrapper;
pub use buf::{Buf, BufMut, BufMutSlice, BufSlice, Limited};

mod buf_pool;
pub use buf_pool::{ReadBuf, ReadBufPool};

pub(crate) mod futures;

mod traits;
Expand Down
5 changes: 4 additions & 1 deletion rt/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl RuntimeInternals {
/// Same as [`RuntimeInternals::setup`], but doesn't attach to an existing [`a10::Ring`].
#[cfg(any(test, feature = "test"))]
pub(crate) fn test_setup(ring_entries: u32) -> io::Result<RuntimeSetup> {
let ring = a10::Ring::new(ring_entries)?;
let ring = a10::Ring::config(ring_entries)
.with_kernel_thread(true)
.build()?;

// Don't have a coordinator so we use our own submission queue.
let coordinator_sq = ring.submission_queue().clone();
Ok(RuntimeSetup {
Expand Down
6 changes: 5 additions & 1 deletion rt/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ pub(crate) fn setup(
/// Test version of [`setup`].
#[cfg(any(test, feature = "test"))]
pub(crate) fn setup_test() -> io::Result<(WorkerSetup, a10::SubmissionQueue)> {
let ring = a10::Ring::config(128).build()?;
let ring = a10::Ring::config(128)
.disable() // Enabled on the worker thread.
.single_issuer()
.with_kernel_thread(true)
.build()?;
Ok(setup2(NonZeroUsize::MAX, ring))
}

Expand Down

0 comments on commit bbd6215

Please sign in to comment.