diff --git a/Cargo.toml b/Cargo.toml index d3b1f0d..bc14383 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ concurrent-queue = "2.2.0" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } futures-lite = { version = "1.11.0", default-features = false } parking = "2.0.0" -polling = "2.6.0" +polling = "3.0.0" rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] } slab = "0.4.2" socket2 = { version = "0.5.3", features = ["all"] } @@ -53,5 +53,4 @@ timerfd = "1" uds_windows = "1" [patch.crates-io] -polling = { git = "https://github.com/smol-rs/polling.git", branch = "notgull/unsafe2" } async-io = { path = "." } diff --git a/src/reactor.rs b/src/reactor.rs index b93f20d..d6f4ad9 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use async_lock::OnceCell; use concurrent_queue::ConcurrentQueue; use futures_lite::ready; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use slab::Slab; // Choose the proper implementation of `Registration` based on the target platform. @@ -77,7 +77,7 @@ pub(crate) struct Reactor { /// Temporary storage for I/O events when polling the reactor. /// /// Holding a lock on this event list implies the exclusive right to poll I/O. - events: Mutex>, + events: Mutex, /// An ordered map of registered timers. /// @@ -104,7 +104,7 @@ impl Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), ticker: AtomicUsize::new(0), sources: Mutex::new(Slab::new()), - events: Mutex::new(Vec::new()), + events: Mutex::new(Events::new()), timers: Mutex::new(BTreeMap::new()), timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE), } @@ -268,7 +268,7 @@ impl Reactor { /// A lock on the reactor. pub(crate) struct ReactorLock<'a> { reactor: &'a Reactor, - events: MutexGuard<'a, Vec>, + events: MutexGuard<'a, Events>, } impl ReactorLock<'_> { @@ -331,14 +331,16 @@ impl ReactorLock<'_> { // e.g. we were previously interested in both readability and writability, // but only one of them was emitted. if !state[READ].is_empty() || !state[WRITE].is_empty() { - source.registration.modify( - &self.reactor.poller, - Event { - key: source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in this event. + source.registration.modify(&self.reactor.poller, event)?; } } } @@ -463,14 +465,16 @@ impl Source { // Update interest in this I/O handle. if was_empty { - self.registration.modify( - &Reactor::get().poller, - Event { - key: self.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(self.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in it. + self.registration.modify(&Reactor::get().poller, event)?; } Poll::Pending @@ -637,14 +641,20 @@ impl> + Clone, T> Future for Ready { // Update interest in this I/O handle. if was_empty { - handle.borrow().source.registration.modify( - &Reactor::get().poller, - Event { - key: handle.borrow().source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(handle.borrow().source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Indicate that we are interested in this event. + handle + .borrow() + .source + .registration + .modify(&Reactor::get().poller, event)?; } Poll::Pending diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index 46c47e2..9f31af9 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -63,7 +63,7 @@ impl Registration { poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) } Self::Process(process) => poller.add_filter( - Process::new(process, ProcessOps::Exit), + unsafe { Process::new(process, ProcessOps::Exit) }, token, PollMode::Oneshot, ), @@ -83,7 +83,7 @@ impl Registration { poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) } Self::Process(process) => poller.modify_filter( - Process::new(process, ProcessOps::Exit), + unsafe { Process::new(process, ProcessOps::Exit) }, interest.key, PollMode::Oneshot, ), @@ -100,7 +100,9 @@ impl Registration { poller.delete(fd) } Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), - Self::Process(process) => poller.delete_filter(Process::new(process, ProcessOps::Exit)), + Self::Process(process) => { + poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) }) + } } } }