Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup tracing instrumentation #40

Merged
merged 3 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.48']
rust: ['1.56']
steps:
- uses: actions/checkout@v3
- name: Install Rust
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "blocking"
version = "1.3.1"
authors = ["Stjepan Glavina <[email protected]>"]
edition = "2018"
rust-version = "1.48"
rust-version = "1.56"
description = "A thread pool for isolating blocking I/O in async programs"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/blocking"
Expand All @@ -21,8 +21,8 @@ async-task = "4.0.2"
fastrand = "1.3.4"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
log = "0.4.17"
piper = "0.2.0"
tracing = { version = "0.1.37", default-features = false }

[dev-dependencies]
futures-lite = "1.11.0"
27 changes: 25 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ impl Executor {
///
/// This function runs blocking tasks until it becomes idle and times out.
fn main_loop(&'static self) {
let span = tracing::trace_span!("blocking::main_loop");
let _enter = span.enter();

let mut inner = self.inner.lock().unwrap();
loop {
// This thread is not idle anymore because it's going to run tasks.
Expand All @@ -207,6 +210,7 @@ impl Executor {

// Put the thread to sleep until another task is scheduled.
let timeout = Duration::from_millis(500);
tracing::trace!(?timeout, "going to sleep");
let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
inner = lock;

Expand All @@ -216,7 +220,11 @@ impl Executor {
inner.thread_count -= 1;
break;
}

tracing::trace!("notified");
}

tracing::trace!("shutting down due to lack of tasks");
}

/// Schedules a runnable task for execution.
Expand All @@ -231,11 +239,21 @@ impl Executor {

/// Spawns more blocking threads if the pool is overloaded with work.
fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
let span = tracing::error_span!(
"grow_pool",
queue_len = inner.queue.len(),
idle_count = inner.idle_count,
thread_count = inner.thread_count,
);
let _enter = span.enter();

// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while inner.queue.len() > inner.idle_count * 5
&& inner.thread_count < inner.thread_limit.get()
{
tracing::trace!("spawning a new thread to handle blocking tasks");

// The new thread starts in idle state.
inner.idle_count += 1;
inner.thread_count += 1;
Expand All @@ -253,7 +271,7 @@ impl Executor {
.spawn(move || self.main_loop())
{
// We were unable to spawn the thread, so we need to undo the state changes.
log::error!("Failed to spawn a blocking thread: {}", e);
tracing::error!("failed to spawn a blocking thread: {}", e);
inner.idle_count -= 1;
inner.thread_count -= 1;

Expand All @@ -264,7 +282,12 @@ impl Executor {

// If the limit is about to be set to zero, set it to one instead so that if,
// in the future, we are able to spawn more threads, we will be able to do so.
NonZeroUsize::new(new_limit).unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
NonZeroUsize::new(new_limit).unwrap_or_else(|| {
tracing::warn!(
"attempted to lower thread_limit to zero; setting to one instead"
);
NonZeroUsize::new(1).unwrap()
})
};
}
}
Expand Down