Skip to content

Commit

Permalink
stream: fix polling issues leading to missed events
Browse files Browse the repository at this point in the history
Stream wakeups don't always correspond to what is in the subscription
event queue. This removes the ready flag which was kind of redundant
anyways.

Changelog-Fixed: Fix polling issues leading to missed async events
Signed-off-by: William Casarin <[email protected]>
  • Loading branch information
jb55 committed Jan 16, 2025
1 parent f254bd5 commit 00ea48e
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 7 deletions.
8 changes: 2 additions & 6 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tracing::error;
/// Used to track query futures
#[derive(Debug, Clone)]
pub(crate) struct SubscriptionState {
pub ready: bool,
pub done: bool,
pub waker: Option<std::task::Waker>,
}
Expand Down Expand Up @@ -83,7 +82,6 @@ impl Stream for SubscriptionStream {
let me = pinned.as_ref().get_ref();
let mut map = me.ndb.subs.lock().unwrap();
let sub_state = map.entry(me.sub_id).or_insert(SubscriptionState {
ready: false,
done: false,
waker: None,
});
Expand All @@ -93,10 +91,8 @@ impl Stream for SubscriptionStream {
return Poll::Ready(None);
}

if sub_state.ready {
// Reset ready, fetch notes
sub_state.ready = false;
let notes = me.ndb.poll_for_notes(me.sub_id, me.max_notes);
let notes = me.ndb.poll_for_notes(me.sub_id, me.max_notes);
if !notes.is_empty() {
return Poll::Ready(Some(notes));
}

Expand Down
1 change: 0 additions & 1 deletion src/ndb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl Ndb {
let mut config = config.set_sub_callback(move |sub_id: u64| {
let mut map = subs_clone.lock().unwrap();
if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
s.ready = true;
if let Some(w) = s.waker.take() {
w.wake();
}
Expand Down

0 comments on commit 00ea48e

Please sign in to comment.