Skip to content

Commit

Permalink
listen: Allow Filter to have a stack buffer for output batches.
Browse files Browse the repository at this point in the history
I haven’t run benchmarks to confirm that this is actually worth doing.
I believe that either it is, or the whole message-batching system needs
rethinking — this is just finishing the implementation of the
current strategy.

Something we could perhaps do instead of this is create a notion of
message delivery “transactions” within which individual messages are
delivered one at a time, allowing taking a lock for the duration.
This way, it wouldn’t be necessary to compile messages into a slice
to achieve efficient locking; the disadvantage is that it exposes
more locking.
  • Loading branch information
kpreid committed Nov 14, 2024
1 parent 09e62dd commit 4aa4065
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 29 deletions.
10 changes: 8 additions & 2 deletions all-is-cubes/src/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,16 @@ pub trait Listener<M>: fmt::Debug + SendSyncIfStd {
Arc::new(self)
}

/// Apply a map/filter function to incoming messages.
/// Apply a map/filter function (similar to [`Iterator::filter_map()`]) to incoming messages.
///
/// Note: By default, this filter breaks up all message batching into batches of 1.
/// In order to avoid this and have more efficient message delivery, use
/// [`Filter::with_stack_buffer()`].
/// This is unnecessary if `size_of::<M>() == 0`; the buffer is automatically unbounded in
/// that case.
///
/// TODO: Doc test
fn filter<MI, F>(self, function: F) -> Filter<F, Self>
fn filter<MI, F>(self, function: F) -> Filter<F, Self, 1>
where
Self: Sized,
F: for<'a> Fn(&'a MI) -> Option<M> + Sync,
Expand Down
86 changes: 59 additions & 27 deletions all-is-cubes/src/listen/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use alloc::sync::{Arc, Weak};
use alloc::vec::Vec;
use core::fmt;

use manyfmt::formats::Unquote;
Expand All @@ -12,15 +11,40 @@ use crate::listen::{Listener, Notifier};
///
/// This may be used to drop uninteresting messages or reduce their granularity.
///
/// * `F` is the type of the filter function to use.
/// * `T` is the type of the listener to pass filtered messages to.
/// * `BATCH` is the maximum number of filtered messages to gather before passing them on.
/// It is used as the size of a stack-allocated array, so should be chosen with the size of
/// the message type in mind.
///
/// TODO: add doc test
pub struct Filter<F, T> {
pub struct Filter<F, T, const BATCH: usize> {
/// The function to transform and possibly discard each message.
pub(super) function: F,
/// The recipient of the messages.
pub(super) target: T,
}

impl<F, T: fmt::Debug> fmt::Debug for Filter<F, T> {
impl<F, T> Filter<F, T, 1> {
/// Request that the filter accumulate output messages into a batch.
///
/// This causes each [receive](Listener::receive) operation to allocate an array `[MO; BATCH]`
/// on the stack, where `MO` is the output message type produced by `F`.
/// Therefore, the buffer size should be chosen keeping the size of `MO` in mind.
/// Also, the amount of buffer used cannot exceed the size of the input batch,
/// so it is not useful to choose a buffer size larger than the expected batch size.
///
/// If `MO` is a zero-sized type, then the buffer is always unbounded,
/// so `with_stack_buffer()` has no effect and is unnecessary in that case.
pub fn with_stack_buffer<const BATCH: usize>(self) -> Filter<F, T, BATCH> {
Filter {
function: self.function,
target: self.target,
}
}
}

impl<F, T: fmt::Debug, const BATCH: usize> fmt::Debug for Filter<F, T, BATCH> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Filter")
// function's type name may be the function name
Expand All @@ -30,7 +54,7 @@ impl<F, T: fmt::Debug> fmt::Debug for Filter<F, T> {
}
}

impl<MI, MO, F, T> Listener<MI> for Filter<F, T>
impl<MI, MO, F, T, const BATCH: usize> Listener<MI> for Filter<F, T, BATCH>
where
F: Fn(&MI) -> Option<MO> + Send + Sync,
T: Listener<MO>,
Expand All @@ -40,35 +64,33 @@ where
// If the size of the output message is zero, then we can buffer an arbitrary number
// of them without occupying any memory or performing any allocation, and therefore
// preserve the input batching for free.
let mut filtered_messages = Vec::<MO>::new();
let mut filtered_messages = alloc::vec::Vec::<MO>::new();
for message in messages {
if let Some(filtered_message) = (self.function)(message) {
filtered_messages.push(filtered_message);
}
}
// Deliver entire batch of ZST messages.
self.target.receive(filtered_messages.as_slice())
return self.target.receive(filtered_messages.as_slice());
} else {
if messages.is_empty() {
// Ensure that we still check liveness if the batch is empty.
return self.target.receive(&[]);
}

let mut buffer: arrayvec::ArrayVec<MO, BATCH> = arrayvec::ArrayVec::new();
for message in messages {
if let Some(filtered_message) = (self.function)(message) {
// TODO: figure out some kind of stack array batching so we don't do a separate
// receive() for each message.
let alive = self.target.receive(&[filtered_message]);
if !alive {
return false;
if buffer.is_full() {
let alive = self.target.receive(&buffer);
if !alive {
return false;
}
buffer.clear();
}
buffer.push(filtered_message);
}
}

// If we got here without returning false, then either we delivered at least one message
// and know that self.target is alive, or all the messages were filtered out.
// (Hmm, should we check liveness anyway in the latter case?)
true
self.target.receive(&buffer)
}
}
}
Expand Down Expand Up @@ -190,27 +212,29 @@ mod tests {
assert_eq!(notifier.count(), 0);
}

/// Currently, `Filter` breaks up all batches into single element batches
/// (unless the type is a ZST).
#[test]
fn filter_batching_nzst() {
let notifier: Notifier<i32> = Notifier::new();
let sink: Sink<Vec<i32>> = Sink::new();
notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x)));
notifier.listen(
CaptureBatch(sink.listener())
.filter(|&x: &i32| Some(x))
.with_stack_buffer::<2>(),
);

// Send some batches
notifier.notify_many(&[0, 1]);
notifier.notify_many(&[]);
notifier.notify_many(&[2, 3]);
notifier.notify_many(&[2, 3, 4]);

// Currently, the batches are all of size 1.
// Expect the batches to be of size at most 2
assert_eq!(
sink.drain(),
vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3],]
vec![vec![], vec![0, 1], vec![], vec![2, 3], vec![4]]
);
}

/// Currently, `Filter` breaks up all batches. This is not ideal.
/// If the message value is a ZST, then batches are unbounded.
#[test]
fn filter_batching_zst() {
let notifier: Notifier<i32> = Notifier::new();
Expand All @@ -222,10 +246,18 @@ mod tests {
// Send some batches
notifier.notify_many(&[0, 1]);
notifier.notify_many(&[]);
notifier.notify_many(&[2, 3]);
notifier.notify_many(&[2, 3, 4, 5]);

// Expect batches to be preserved and filtered.
assert_eq!(sink.drain(), vec![vec![], vec![(), ()], vec![], vec![()]]);
// Expect batches to be preserved and filtered, even though we didn’t set a batch size.
assert_eq!(
sink.drain(),
vec![
vec![], // initial liveness check on listen()
vec![(), ()], // first nonempty batch
vec![], // empty batch
vec![(), (), ()], // second nonempty batch, with 1 item dropped
]
);
}

#[test]
Expand Down

0 comments on commit 4aa4065

Please sign in to comment.