diff --git a/src/core/eventloop.cpp b/src/core/eventloop.cpp index 277cce56..9fcf835b 100644 --- a/src/core/eventloop.cpp +++ b/src/core/eventloop.cpp @@ -69,7 +69,7 @@ int EventLoop::registerTimer(int timeout, void (*cb)(void *), void *ctx) void EventLoop::deregister(int id) { - ffi::event_loop_deregister(inner_, id); + assert(ffi::event_loop_deregister(inner_, id) == 0); } EventLoop *EventLoop::instance() diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index 88d5880d..1d88a7c8 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -62,7 +62,7 @@ impl Evented { } struct Registration { - _evented: Evented, + evented: Evented, activated: bool, callback: Option, } @@ -111,10 +111,11 @@ impl Registrations { let entry = data.nodes.vacant_entry(); let nkey = entry.key(); + evented.registration().set_waker_persistent(true); evented.registration().set_waker(&get_waker(nkey), interest); let reg = Registration { - _evented: evented, + evented, activated: false, callback: Some(callback), }; @@ -124,13 +125,19 @@ impl Registrations { Ok(nkey) } - fn remove(&self, reg_id: usize) { + fn remove(&self, reg_id: usize) -> Result<(), RegistrationsError> { let nkey = reg_id; let data = &mut *self.data.borrow_mut(); + if !data.nodes.contains(nkey) { + return Err(RegistrationsError); + } + data.activated.remove(&mut data.nodes, nkey); data.nodes.remove(nkey); + + Ok(()) } fn activate(&self, reg_id: usize) { @@ -179,23 +186,38 @@ impl Registrations { }; let reg = &mut data.nodes[nkey].value; - reg.activated = false; let callback = reg .callback .take() .expect("registration should have a callback"); + let nkey = if let Evented::Timer(_) = ®.evented { + // remove timer registrations after activation + data.nodes.remove(nkey); + + None + } else { + reg.activated = false; + reg.evented + .registration() + .clear_readiness(mio::Interest::READABLE | mio::Interest::WRITABLE); + + Some(nkey) + }; + (nkey, callback) }; callback.call(); - let data = &mut *self.data.borrow_mut(); + if let Some(nkey) = nkey { + let data = &mut *self.data.borrow_mut(); - let reg = &mut data.nodes[nkey].value; + let reg = &mut data.nodes[nkey].value; - reg.callback = Some(callback); + reg.callback = Some(callback); + } } } @@ -338,8 +360,8 @@ impl EventLoop { .expect("slab should have capacity")) } - pub fn deregister(&self, id: usize) { - self.regs.remove(id); + pub fn deregister(&self, id: usize) -> Result<(), EventLoopError> { + self.regs.remove(id).map_err(|_| EventLoopError) } fn poll_and_dispatch(&self, timeout: Option) -> Option { @@ -504,10 +526,17 @@ mod ffi { #[allow(clippy::missing_safety_doc)] #[no_mangle] - pub unsafe extern "C" fn event_loop_deregister(l: *mut EventLoopRaw, id: libc::size_t) { + pub unsafe extern "C" fn event_loop_deregister( + l: *mut EventLoopRaw, + id: libc::size_t, + ) -> libc::c_int { let l = l.as_mut().unwrap(); - l.deregister(id); + if l.deregister(id).is_err() { + return -1; + } + + 0 } } @@ -516,8 +545,11 @@ mod tests { use super::*; use crate::core::executor::Executor; use crate::core::reactor::Reactor; + use std::cell::Cell; + use std::io; use std::os::fd::AsRawFd; use std::rc::Rc; + use std::thread; struct NoopCallback; @@ -552,24 +584,51 @@ mod tests { let addr = listener.local_addr().unwrap(); let fd = listener.as_raw_fd(); + let count = Rc::new(Cell::new(0)); + let cb = { let l = Rc::clone(&l); let listener = Rc::clone(&listener); + let count = Rc::clone(&count); Box::new(FnCallback(move || { let _stream = listener.accept().unwrap(); - l.exit(0); + + let e = listener.accept().unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + + count.set(count.get() + 1); + if count.get() == 2 { + l.exit(0); + } })) }; let id = l.register_fd(fd, READABLE, cb).unwrap(); - // non-blocking connect attempt to trigger listener - let _stream = mio::net::TcpStream::connect(addr); + { + // non-blocking connect attempt to trigger listener + let _stream = mio::net::TcpStream::connect(addr); + + while count.get() < 1 { + l.step(); + thread::sleep(Duration::from_millis(10)); + } + } + + { + // non-blocking connect attempt to trigger listener + let _stream = mio::net::TcpStream::connect(addr); + + while count.get() < 2 { + l.step(); + thread::sleep(Duration::from_millis(10)); + } + } assert_eq!(l.exec(), 0); - l.deregister(id); + l.deregister(id).unwrap(); } #[test] @@ -591,11 +650,14 @@ mod tests { assert_eq!(l.exec(), 0); - l.deregister(id); + // activated timers automatically deregister + l.deregister(id).unwrap_err(); - assert!(l + let id = l .register_timer(Duration::from_millis(0), Box::new(NoopCallback)) - .is_ok()); + .unwrap(); + + l.deregister(id).unwrap(); } #[test] @@ -630,7 +692,7 @@ mod tests { assert_eq!(l.exec_async().await, 0); - l.deregister(id); + l.deregister(id).unwrap(); }) .unwrap();