From 4aa4065d5774011610e6d9de2845c1e9257a1f27 Mon Sep 17 00:00:00 2001
From: Kevin Reid <kpreid@switchb.org>
Date: Sun, 15 Sep 2024 21:09:13 -0700
Subject: [PATCH] listen: Allow `Filter` to have a stack buffer for output
 batches.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

I haven’t run benchmarks to confirm that this is actually worth doing.
I believe that either it is, or the whole message-batching system needs
rethinking — this is just finishing the implementation of the
current strategy.

Something we could perhaps do instead of this is create a notion of
message delivery “transactions” within which individual messages are
delivered one at a time, allowing taking a lock for the duration.
This way, it wouldn’t be necessary to compile messages into a slice
to achieve efficient locking; the disadvantage is that it exposes
more locking.
---
 all-is-cubes/src/listen.rs      | 10 +++-
 all-is-cubes/src/listen/util.rs | 86 ++++++++++++++++++++++-----------
 2 files changed, 67 insertions(+), 29 deletions(-)

diff --git a/all-is-cubes/src/listen.rs b/all-is-cubes/src/listen.rs
index 8e23d6a6d..680756a8a 100644
--- a/all-is-cubes/src/listen.rs
+++ b/all-is-cubes/src/listen.rs
@@ -324,10 +324,16 @@ pub trait Listener<M>: fmt::Debug + SendSyncIfStd {
         Arc::new(self)
     }
 
-    /// Apply a map/filter function to incoming messages.
+    /// Apply a map/filter function (similar to [`Iterator::filter_map()`]) to incoming messages.
+    ///
+    /// Note: By default, this filter breaks up all message batching into batches of 1.
+    /// In order to avoid this and have more efficient message delivery, use
+    /// [`Filter::with_stack_buffer()`].
+    /// This is unnecessary if `size_of::<M>() == 0`; the buffer is automatically unbounded in
+    /// that case.
     ///
     /// TODO: Doc test
-    fn filter<MI, F>(self, function: F) -> Filter<F, Self>
+    fn filter<MI, F>(self, function: F) -> Filter<F, Self, 1>
     where
         Self: Sized,
         F: for<'a> Fn(&'a MI) -> Option<M> + Sync,
diff --git a/all-is-cubes/src/listen/util.rs b/all-is-cubes/src/listen/util.rs
index 5034593e1..2f0065224 100644
--- a/all-is-cubes/src/listen/util.rs
+++ b/all-is-cubes/src/listen/util.rs
@@ -1,5 +1,4 @@
 use alloc::sync::{Arc, Weak};
-use alloc::vec::Vec;
 use core::fmt;
 
 use manyfmt::formats::Unquote;
@@ -12,15 +11,40 @@ use crate::listen::{Listener, Notifier};
 ///
 /// This may be used to drop uninteresting messages or reduce their granularity.
 ///
+/// * `F` is the type of the filter function to use.
+/// * `T` is the type of the listener to pass filtered messages to.
+/// * `BATCH` is the maximum number of filtered messages to gather before passing them on.
+///   It is used as the size of a stack-allocated array, so should be chosen with the size of
+///   the message type in mind.
+///
 /// TODO: add doc test
-pub struct Filter<F, T> {
+pub struct Filter<F, T, const BATCH: usize> {
     /// The function to transform and possibly discard each message.
     pub(super) function: F,
     /// The recipient of the messages.
     pub(super) target: T,
 }
 
-impl<F, T: fmt::Debug> fmt::Debug for Filter<F, T> {
+impl<F, T> Filter<F, T, 1> {
+    /// Request that the filter accumulate output messages into a batch.
+    ///
+    /// This causes each [receive](Listener::receive) operation to allocate an array `[MO; BATCH]`
+    /// on the stack, where `MO` is the output message type produced by `F`.
+    /// Therefore, the buffer size should be chosen keeping the size of `MO` in mind.
+    /// Also, the amount of buffer used cannot exceed the size of the input batch,
+    /// so it is not useful to choose a buffer size larger than the expected batch size.
+    ///
+    /// If `MO` is a zero-sized type, then the buffer is always unbounded,
+    /// so `with_stack_buffer()` has no effect and is unnecessary in that case.
+    pub fn with_stack_buffer<const BATCH: usize>(self) -> Filter<F, T, BATCH> {
+        Filter {
+            function: self.function,
+            target: self.target,
+        }
+    }
+}
+
+impl<F, T: fmt::Debug, const BATCH: usize> fmt::Debug for Filter<F, T, BATCH> {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         f.debug_struct("Filter")
             // function's type name may be the function name
@@ -30,7 +54,7 @@ impl<F, T: fmt::Debug> fmt::Debug for Filter<F, T> {
     }
 }
 
-impl<MI, MO, F, T> Listener<MI> for Filter<F, T>
+impl<MI, MO, F, T, const BATCH: usize> Listener<MI> for Filter<F, T, BATCH>
 where
     F: Fn(&MI) -> Option<MO> + Send + Sync,
     T: Listener<MO>,
@@ -40,35 +64,33 @@ where
             // If the size of the output message is zero, then we can buffer an arbitrary number
             // of them without occupying any memory or performing any allocation, and therefore
             // preserve the input batching for free.
-            let mut filtered_messages = Vec::<MO>::new();
+            let mut filtered_messages = alloc::vec::Vec::<MO>::new();
             for message in messages {
                 if let Some(filtered_message) = (self.function)(message) {
                     filtered_messages.push(filtered_message);
                 }
             }
             // Deliver entire batch of ZST messages.
-            self.target.receive(filtered_messages.as_slice())
+            return self.target.receive(filtered_messages.as_slice());
         } else {
             if messages.is_empty() {
-                // Ensure that we still check liveness if the batch is empty.
                 return self.target.receive(&[]);
             }
 
+            let mut buffer: arrayvec::ArrayVec<MO, BATCH> = arrayvec::ArrayVec::new();
             for message in messages {
                 if let Some(filtered_message) = (self.function)(message) {
-                    // TODO: figure out some kind of stack array batching so we don't do a separate
-                    // receive() for each message.
-                    let alive = self.target.receive(&[filtered_message]);
-                    if !alive {
-                        return false;
+                    if buffer.is_full() {
+                        let alive = self.target.receive(&buffer);
+                        if !alive {
+                            return false;
+                        }
+                        buffer.clear();
                     }
+                    buffer.push(filtered_message);
                 }
             }
-
-            // If we got here without returning false, then either we delivered at least one message
-            // and know that self.target is alive, or all the messages were filtered out.
-            // (Hmm, should we check liveness anyway in the latter case?)
-            true
+            self.target.receive(&buffer)
         }
     }
 }
@@ -190,27 +212,29 @@ mod tests {
         assert_eq!(notifier.count(), 0);
     }
 
-    /// Currently, `Filter` breaks up all batches into single element batches
-    /// (unless the type is a ZST).
     #[test]
     fn filter_batching_nzst() {
         let notifier: Notifier<i32> = Notifier::new();
         let sink: Sink<Vec<i32>> = Sink::new();
-        notifier.listen(CaptureBatch(sink.listener()).filter(|&x: &i32| Some(x)));
+        notifier.listen(
+            CaptureBatch(sink.listener())
+                .filter(|&x: &i32| Some(x))
+                .with_stack_buffer::<2>(),
+        );
 
         // Send some batches
         notifier.notify_many(&[0, 1]);
         notifier.notify_many(&[]);
-        notifier.notify_many(&[2, 3]);
+        notifier.notify_many(&[2, 3, 4]);
 
-        // Currently, the batches are all of size 1.
+        // Expect the batches to be of size at most 2
         assert_eq!(
             sink.drain(),
-            vec![vec![], vec![0], vec![1], vec![], vec![2], vec![3],]
+            vec![vec![], vec![0, 1], vec![], vec![2, 3], vec![4]]
         );
     }
 
-    /// Currently, `Filter` breaks up all batches. This is not ideal.
+    /// If the message value is a ZST, then batches are unbounded.
     #[test]
     fn filter_batching_zst() {
         let notifier: Notifier<i32> = Notifier::new();
@@ -222,10 +246,18 @@ mod tests {
         // Send some batches
         notifier.notify_many(&[0, 1]);
         notifier.notify_many(&[]);
-        notifier.notify_many(&[2, 3]);
+        notifier.notify_many(&[2, 3, 4, 5]);
 
-        // Expect batches to be preserved and filtered.
-        assert_eq!(sink.drain(), vec![vec![], vec![(), ()], vec![], vec![()]]);
+        // Expect batches to be preserved and filtered, even though we didn’t set a batch size.
+        assert_eq!(
+            sink.drain(),
+            vec![
+                vec![],           // initial liveness check on listen()
+                vec![(), ()],     // first nonempty batch
+                vec![],           // empty batch
+                vec![(), (), ()], // second nonempty batch, with 1 item dropped
+            ]
+        );
     }
 
     #[test]