Skip to content

Commit

Permalink
Add ReadBufPool
Browse files Browse the repository at this point in the history
Specialised read buffer pool that wraps around a10's ReadBufPool, which
wraps around io_uring read buffer pool.
  • Loading branch information
Thomasdezeeuw committed Apr 6, 2024
1 parent bbd6215 commit 70f08ab
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
12 changes: 6 additions & 6 deletions rt/src/io/buf_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use a10::io::{Buf as _, BufMut as _};
/// This is a special buffer pool that can only be used in read operations done
/// by the kernel, i.e. no in-memory operations. The buffer pool is actually
/// managed by the kernel in `read(2)` and `recv(2)` like calls. Instead of user
/// space having to select a buffer before issueing the read call, the kernel
/// space having to select a buffer before issuing the read call, the kernel
/// will select a buffer from the pool when it's ready for reading. This avoids
/// the need to have as many buffers as concurrent read calls.
///
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ReadBuf {
///
/// If the buffer is shorter then `len` bytes this does nothing.
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
self.inner.truncate(len);
}

/// Clear the buffer.
Expand All @@ -113,7 +113,7 @@ impl ReadBuf {
/// This is not the same as returning the buffer to the buffer pool, for
/// that use [`ReadBuf::release`].
pub fn clear(&mut self) {
self.inner.clear()
self.inner.clear();
}

/// Remove the bytes in `range` from the buffer.
Expand All @@ -125,7 +125,7 @@ impl ReadBuf {
where
R: RangeBounds<usize>,
{
self.inner.remove(range)
self.inner.remove(range);
}

/// Set the length of the buffer to `new_len`.
Expand All @@ -135,7 +135,7 @@ impl ReadBuf {
/// The caller must ensure `new_len` bytes are initialised and that
/// `new_len` is not larger than the buffer's capacity.
pub unsafe fn set_len(&mut self, new_len: usize) {
self.inner.set_len(new_len)
self.inner.set_len(new_len);
}

/// Appends `other` to `self`.
Expand Down Expand Up @@ -163,7 +163,7 @@ impl ReadBuf {
///
/// This is automatically called in the `Drop` implementation.
pub fn release(&mut self) {
self.inner.release()
self.inner.release();
}
}

Expand Down
32 changes: 30 additions & 2 deletions rt/tests/functional/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::path::PathBuf;
use heph::actor::{self, actor_fn};
use heph_rt::access::ThreadLocal;
use heph_rt::fs::{self, Advice, AllocateMode, File};
use heph_rt::io::{Read, Write};
use heph_rt::test::block_on_local_actor;
use heph_rt::io::{Read, ReadBufPool, Write};
use heph_rt::test::{block_on_actor, block_on_local_actor};

use crate::util::{temp_dir_root, temp_file};

Expand Down Expand Up @@ -35,6 +35,34 @@ fn file_read_write() {
block_on_local_actor(actor_fn(actor), ());
}

#[test]
fn file_read_write_readbuf_threadlocal() {
block_on_local_actor(actor_fn(file_read_write_readbuf), ());
}

#[test]
fn file_read_write_readbuf_threadsafe() {
block_on_actor(actor_fn(file_read_write_readbuf), ());
}

async fn file_read_write_readbuf<RT: heph_rt::Access>(ctx: actor::Context<!, RT>) {
let buf_pool = ReadBufPool::new(ctx.runtime_ref(), 2, 4096).unwrap();
let path = temp_file("file_read_write_readbuf_threadsafe");
let file = File::create(ctx.runtime_ref(), path).await.unwrap();
let mut buf = (&file).read(buf_pool.get()).await.unwrap();
assert!(buf.is_empty());
buf.release();

(&file).write_all(DATA1).await.unwrap();
let mut buf = file.read_at(buf, 0).await.unwrap();
assert_eq!(buf.as_slice(), DATA1);
buf.release();

(&file).write_all_at(&DATA2[7..], 7).await.unwrap();
let buf = file.read_at(buf, 0).await.unwrap();
assert_eq!(buf.as_slice(), DATA2);
}

#[test]
fn file_from_std() {
async fn actor(ctx: actor::Context<!, ThreadLocal>) {
Expand Down
10 changes: 9 additions & 1 deletion rt/tests/functional/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::cmp::min;
use std::ptr;
use std::sync::Arc;

use heph_rt::io::{Buf, BufMut, BufMutSlice, BufSlice};
use heph_rt::io::{Buf, BufMut, BufMutSlice, BufSlice, ReadBuf, ReadBufPool};

use crate::util::assert_size;

const DATA: &[u8] = b"Hello world!";
const DATA2: &[u8] = b"Hello mars.";
Expand Down Expand Up @@ -282,3 +284,9 @@ fn test_buf_mut_slice<B: BufMutSlice<2>>(mut bufs: B, expected_limit: usize) {
assert!(!bufs.has_spare_capacity());
assert!(bufs.total_spare_capacity() == 0);
}

#[test]
fn read_buf_pool_size() {
assert_size::<ReadBufPool>(8);
assert_size::<ReadBuf>(24);
}

0 comments on commit 70f08ab

Please sign in to comment.