Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement in 100% safe code #11

Merged
merged 1 commit into from
Jun 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.35']
rust: ['1.63']
steps:
- uses: actions/checkout@v3
- name: Install Rust
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "easy-parallel"
version = "3.3.0"
authors = ["Stjepan Glavina <[email protected]>"]
edition = "2018"
rust-version = "1.35"
rust-version = "1.63"
description = "Run closures in parallel"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/easy-parallel"
Expand Down
90 changes: 35 additions & 55 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@
//! ```

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![forbid(unsafe_code)]

use std::fmt;
use std::iter::{self, FromIterator};
use std::mem;
use std::panic;
use std::process;
use std::sync::mpsc;
use std::thread;

Expand Down Expand Up @@ -256,58 +255,50 @@ impl<'a, T> Parallel<'a, T> {
T: Send + 'a,
C: FromIterator<T>,
{
// Set up a guard that aborts on panic.
let guard = NoPanic;
// Set up a new thread scope.
thread::scope(|scope| {
// Join handles for spawned threads.
let mut handles = Vec::new();

// Join handles for spawned threads.
let mut handles = Vec::new();
// Channels to collect results from spawned threads.
let mut receivers = Vec::new();

// Channels to collect results from spawned threads.
let mut receivers = Vec::new();
for f in self.closures.into_iter() {
// Wrap into a closure that sends the result back.
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();

// Spawn a thread for each closure after the first one.
for f in self.closures.into_iter() {
// Wrap into a closure that sends the result back.
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();

// Erase the `'a` lifetime.
let f: Box<dyn FnOnce() + Send + 'a> = Box::new(f);
let f: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(f) };

// Spawn a thread for the closure.
handles.push(thread::spawn(f));
receivers.push(receiver);
}
// Spawn it on the scope.
handles.push(scope.spawn(f));
receivers.push(receiver);
}

let mut last_err = None;
let mut last_err = None;

// Run the main closure on the main thread.
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
// Run the main closure on the main thread.
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));

// Join threads and save the last panic if there was one.
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
// Join threads and save the last panic if there was one.
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
}
}
}

// Drop the guard because we may resume a panic now.
drop(guard);

// If a thread has panicked, resume the last collected panic.
if let Some(err) = last_err {
panic::resume_unwind(err);
}
// If a thread has panicked, resume the last collected panic.
if let Some(err) = last_err {
panic::resume_unwind(err);
}

// Collect the results from threads.
let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();
// Collect the results from threads.
let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();

// If the main closure panicked, resume its panic.
match res {
Ok(r) => (results, r),
Err(err) => panic::resume_unwind(err),
}
// If the main closure panicked, resume its panic.
match res {
Ok(r) => (results, r),
Err(err) => panic::resume_unwind(err),
}
})
}
}

Expand All @@ -324,14 +315,3 @@ impl<T> Default for Parallel<'_, T> {
Self::new()
}
}

/// Aborts the process if dropped while panicking.
struct NoPanic;

impl Drop for NoPanic {
fn drop(&mut self) {
if thread::panicking() {
process::abort();
}
}
}