diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6b168f..ed6c161 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,7 +45,7 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.35'] + rust: ['1.63'] steps: - uses: actions/checkout@v3 - name: Install Rust diff --git a/Cargo.toml b/Cargo.toml index 940a114..f6d9070 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "easy-parallel" version = "3.3.0" authors = ["Stjepan Glavina "] edition = "2018" -rust-version = "1.35" +rust-version = "1.63" description = "Run closures in parallel" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/easy-parallel" diff --git a/src/lib.rs b/src/lib.rs index 6bef1c5..0ee20a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,12 +57,11 @@ //! ``` #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![forbid(unsafe_code)] use std::fmt; use std::iter::{self, FromIterator}; -use std::mem; use std::panic; -use std::process; use std::sync::mpsc; use std::thread; @@ -256,58 +255,50 @@ impl<'a, T> Parallel<'a, T> { T: Send + 'a, C: FromIterator, { - // Set up a guard that aborts on panic. - let guard = NoPanic; + // Set up a new thread scope. + thread::scope(|scope| { + // Join handles for spawned threads. + let mut handles = Vec::new(); - // Join handles for spawned threads. - let mut handles = Vec::new(); + // Channels to collect results from spawned threads. + let mut receivers = Vec::new(); - // Channels to collect results from spawned threads. - let mut receivers = Vec::new(); + for f in self.closures.into_iter() { + // Wrap into a closure that sends the result back. + let (sender, receiver) = mpsc::channel(); + let f = move || sender.send(f()).unwrap(); - // Spawn a thread for each closure after the first one. - for f in self.closures.into_iter() { - // Wrap into a closure that sends the result back. - let (sender, receiver) = mpsc::channel(); - let f = move || sender.send(f()).unwrap(); - - // Erase the `'a` lifetime. - let f: Box = Box::new(f); - let f: Box = unsafe { mem::transmute(f) }; - - // Spawn a thread for the closure. - handles.push(thread::spawn(f)); - receivers.push(receiver); - } + // Spawn it on the scope. + handles.push(scope.spawn(f)); + receivers.push(receiver); + } - let mut last_err = None; + let mut last_err = None; - // Run the main closure on the main thread. - let res = panic::catch_unwind(panic::AssertUnwindSafe(f)); + // Run the main closure on the main thread. + let res = panic::catch_unwind(panic::AssertUnwindSafe(f)); - // Join threads and save the last panic if there was one. - for h in handles { - if let Err(err) = h.join() { - last_err = Some(err); + // Join threads and save the last panic if there was one. + for h in handles { + if let Err(err) = h.join() { + last_err = Some(err); + } } - } - - // Drop the guard because we may resume a panic now. - drop(guard); - // If a thread has panicked, resume the last collected panic. - if let Some(err) = last_err { - panic::resume_unwind(err); - } + // If a thread has panicked, resume the last collected panic. + if let Some(err) = last_err { + panic::resume_unwind(err); + } - // Collect the results from threads. - let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect(); + // Collect the results from threads. + let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect(); - // If the main closure panicked, resume its panic. - match res { - Ok(r) => (results, r), - Err(err) => panic::resume_unwind(err), - } + // If the main closure panicked, resume its panic. + match res { + Ok(r) => (results, r), + Err(err) => panic::resume_unwind(err), + } + }) } } @@ -324,14 +315,3 @@ impl Default for Parallel<'_, T> { Self::new() } } - -/// Aborts the process if dropped while panicking. -struct NoPanic; - -impl Drop for NoPanic { - fn drop(&mut self) { - if thread::panicking() { - process::abort(); - } - } -}