From ed1e6b40d5540ab1b1441d6e92aecb5db7cbe472 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Fri, 21 Feb 2025 17:51:04 -0800 Subject: [PATCH 1/2] eventloop: various registration management fixes --- src/core/eventloop.cpp | 2 +- src/core/eventloop.rs | 103 +++++++++++++++++++++++++++++++++-------- 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/src/core/eventloop.cpp b/src/core/eventloop.cpp index 277cce56..9fcf835b 100644 --- a/src/core/eventloop.cpp +++ b/src/core/eventloop.cpp @@ -69,7 +69,7 @@ int EventLoop::registerTimer(int timeout, void (*cb)(void *), void *ctx) void EventLoop::deregister(int id) { - ffi::event_loop_deregister(inner_, id); + assert(ffi::event_loop_deregister(inner_, id) == 0); } EventLoop *EventLoop::instance() diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index 88d5880d..940f9359 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -62,7 +62,7 @@ impl Evented { } struct Registration { - _evented: Evented, + evented: Evented, activated: bool, callback: Option, } @@ -111,10 +111,11 @@ impl Registrations { let entry = data.nodes.vacant_entry(); let nkey = entry.key(); + evented.registration().set_waker_persistent(true); evented.registration().set_waker(&get_waker(nkey), interest); let reg = Registration { - _evented: evented, + evented, activated: false, callback: Some(callback), }; @@ -124,13 +125,19 @@ impl Registrations { Ok(nkey) } - fn remove(&self, reg_id: usize) { + fn remove(&self, reg_id: usize) -> Result<(), RegistrationsError> { let nkey = reg_id; let data = &mut *self.data.borrow_mut(); + if !data.nodes.contains(nkey) { + return Err(RegistrationsError); + } + data.activated.remove(&mut data.nodes, nkey); data.nodes.remove(nkey); + + Ok(()) } fn activate(&self, reg_id: usize) { @@ -179,23 +186,38 @@ impl Registrations { }; let reg = &mut data.nodes[nkey].value; - reg.activated = false; let callback = reg .callback .take() .expect("registration should have a callback"); + let nkey = if let Evented::Timer(_) = ®.evented { + // remove timer registrations after activation + data.nodes.remove(nkey); + + None + } else { + reg.activated = false; + reg.evented + .registration() + .clear_readiness(mio::Interest::READABLE | mio::Interest::WRITABLE); + + Some(nkey) + }; + (nkey, callback) }; callback.call(); - let data = &mut *self.data.borrow_mut(); + if let Some(nkey) = nkey { + let data = &mut *self.data.borrow_mut(); - let reg = &mut data.nodes[nkey].value; + let reg = &mut data.nodes[nkey].value; - reg.callback = Some(callback); + reg.callback = Some(callback); + } } } @@ -338,8 +360,11 @@ impl EventLoop { .expect("slab should have capacity")) } - pub fn deregister(&self, id: usize) { - self.regs.remove(id); + pub fn deregister(&self, id: usize) -> Result<(), EventLoopError> { + match self.regs.remove(id) { + Ok(()) => Ok(()), + Err(_) => Err(EventLoopError), + } } fn poll_and_dispatch(&self, timeout: Option) -> Option { @@ -504,10 +529,17 @@ mod ffi { #[allow(clippy::missing_safety_doc)] #[no_mangle] - pub unsafe extern "C" fn event_loop_deregister(l: *mut EventLoopRaw, id: libc::size_t) { + pub unsafe extern "C" fn event_loop_deregister( + l: *mut EventLoopRaw, + id: libc::size_t, + ) -> libc::c_int { let l = l.as_mut().unwrap(); - l.deregister(id); + if l.deregister(id).is_err() { + return -1; + } + + 0 } } @@ -516,8 +548,11 @@ mod tests { use super::*; use crate::core::executor::Executor; use crate::core::reactor::Reactor; + use std::cell::Cell; + use std::io; use std::os::fd::AsRawFd; use std::rc::Rc; + use std::thread; struct NoopCallback; @@ -552,24 +587,51 @@ mod tests { let addr = listener.local_addr().unwrap(); let fd = listener.as_raw_fd(); + let count = Rc::new(Cell::new(0)); + let cb = { let l = Rc::clone(&l); let listener = Rc::clone(&listener); + let count = Rc::clone(&count); Box::new(FnCallback(move || { let _stream = listener.accept().unwrap(); - l.exit(0); + + let e = listener.accept().unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + + count.set(count.get() + 1); + if count.get() == 2 { + l.exit(0); + } })) }; let id = l.register_fd(fd, READABLE, cb).unwrap(); - // non-blocking connect attempt to trigger listener - let _stream = mio::net::TcpStream::connect(addr); + { + // non-blocking connect attempt to trigger listener + let _stream = mio::net::TcpStream::connect(addr); + + while count.get() < 1 { + l.step(); + thread::sleep(Duration::from_millis(10)); + } + } + + { + // non-blocking connect attempt to trigger listener + let _stream = mio::net::TcpStream::connect(addr); + + while count.get() < 2 { + l.step(); + thread::sleep(Duration::from_millis(10)); + } + } assert_eq!(l.exec(), 0); - l.deregister(id); + l.deregister(id).unwrap(); } #[test] @@ -591,11 +653,14 @@ mod tests { assert_eq!(l.exec(), 0); - l.deregister(id); + // activated timers automatically deregister + l.deregister(id).unwrap_err(); - assert!(l + let id = l .register_timer(Duration::from_millis(0), Box::new(NoopCallback)) - .is_ok()); + .unwrap(); + + l.deregister(id).unwrap(); } #[test] @@ -630,7 +695,7 @@ mod tests { assert_eq!(l.exec_async().await, 0); - l.deregister(id); + l.deregister(id).unwrap(); }) .unwrap(); From d34f09db200dc314be131e36ee61889e12fd07b4 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Mon, 24 Feb 2025 10:21:18 -0800 Subject: [PATCH 2/2] nit --- src/core/eventloop.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/core/eventloop.rs b/src/core/eventloop.rs index 940f9359..1d88a7c8 100644 --- a/src/core/eventloop.rs +++ b/src/core/eventloop.rs @@ -361,10 +361,7 @@ impl EventLoop { } pub fn deregister(&self, id: usize) -> Result<(), EventLoopError> { - match self.regs.remove(id) { - Ok(()) => Ok(()), - Err(_) => Err(EventLoopError), - } + self.regs.remove(id).map_err(|_| EventLoopError) } fn poll_and_dispatch(&self, timeout: Option) -> Option {