Skip to content

Commit

Permalink
Update for polling v3.0.0
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Sep 5, 2023
1 parent dcd10c0 commit d01748d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 33 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
parking = "2.0.0"
polling = "2.6.0"
polling = "3.0.0"
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
Expand All @@ -53,5 +53,4 @@ timerfd = "1"
uds_windows = "1"

[patch.crates-io]
polling = { git = "https://github.com/smol-rs/polling.git", branch = "notgull/unsafe2" }
async-io = { path = "." }
66 changes: 38 additions & 28 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use async_lock::OnceCell;
use concurrent_queue::ConcurrentQueue;
use futures_lite::ready;
use polling::{Event, Poller};
use polling::{Event, Events, Poller};
use slab::Slab;

// Choose the proper implementation of `Registration` based on the target platform.
Expand Down Expand Up @@ -77,7 +77,7 @@ pub(crate) struct Reactor {
/// Temporary storage for I/O events when polling the reactor.
///
/// Holding a lock on this event list implies the exclusive right to poll I/O.
events: Mutex<Vec<Event>>,
events: Mutex<Events>,

/// An ordered map of registered timers.
///
Expand All @@ -104,7 +104,7 @@ impl Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(Vec::new()),
events: Mutex::new(Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
}
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Reactor {
/// A lock on the reactor.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, Vec<Event>>,
events: MutexGuard<'a, Events>,
}

impl ReactorLock<'_> {
Expand Down Expand Up @@ -331,14 +331,16 @@ impl ReactorLock<'_> {
// e.g. we were previously interested in both readability and writability,
// but only one of them was emitted.
if !state[READ].is_empty() || !state[WRITE].is_empty() {
source.registration.modify(
&self.reactor.poller,
Event {
key: source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
// Create the event that we are interested in.
let event = {
let mut event = Event::none(source.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};

// Register interest in this event.
source.registration.modify(&self.reactor.poller, event)?;
}
}
}
Expand Down Expand Up @@ -463,14 +465,16 @@ impl Source {

// Update interest in this I/O handle.
if was_empty {
self.registration.modify(
&Reactor::get().poller,
Event {
key: self.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
// Create the event that we are interested in.
let event = {
let mut event = Event::none(self.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};

// Register interest in it.
self.registration.modify(&Reactor::get().poller, event)?;
}

Poll::Pending
Expand Down Expand Up @@ -637,14 +641,20 @@ impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {

// Update interest in this I/O handle.
if was_empty {
handle.borrow().source.registration.modify(
&Reactor::get().poller,
Event {
key: handle.borrow().source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
// Create the event that we are interested in.
let event = {
let mut event = Event::none(handle.borrow().source.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};

// Indicate that we are interested in this event.
handle
.borrow()
.source
.registration
.modify(&Reactor::get().poller, event)?;
}

Poll::Pending
Expand Down
8 changes: 5 additions & 3 deletions src/reactor/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Registration {
poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot)
}
Self::Process(process) => poller.add_filter(
Process::new(process, ProcessOps::Exit),
unsafe { Process::new(process, ProcessOps::Exit) },
token,
PollMode::Oneshot,
),
Expand All @@ -83,7 +83,7 @@ impl Registration {
poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot)
}
Self::Process(process) => poller.modify_filter(
Process::new(process, ProcessOps::Exit),
unsafe { Process::new(process, ProcessOps::Exit) },
interest.key,
PollMode::Oneshot,
),
Expand All @@ -100,7 +100,9 @@ impl Registration {
poller.delete(fd)
}
Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)),
Self::Process(process) => poller.delete_filter(Process::new(process, ProcessOps::Exit)),
Self::Process(process) => {
poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) })
}
}
}
}

0 comments on commit d01748d

Please sign in to comment.