diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3242d3ce2ea..bde40577e4f 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -602,3 +602,15 @@ macro_rules! cfg_is_wasm_not_wasi { )* } } + +macro_rules! cfg_metrics_variant { + (stable: {$($stable_code:tt)*}, unstable: {$($unstable_code:tt)*}) => { + cfg_not_unstable_metrics! { + $($stable_code)* + } + + cfg_unstable_metrics! { + $($unstable_code)* + } + } +} diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index aa6b78db779..324ca363395 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -1,125 +1,189 @@ -use crate::runtime::metrics::{HistogramBatch, WorkerMetrics}; +use crate::runtime::metrics::WorkerMetrics; + +cfg_unstable_metrics! { + use crate::runtime::metrics::HistogramBatch; +} use std::sync::atomic::Ordering::Relaxed; use std::time::{Duration, Instant}; pub(crate) struct MetricsBatch { + /// The total busy duration in nanoseconds. + busy_duration_total: u64, + + /// Instant at which work last resumed (continued after park). + processing_scheduled_tasks_started_at: Instant, + + #[cfg(tokio_unstable)] /// Number of times the worker parked. park_count: u64, + #[cfg(tokio_unstable)] /// Number of times the worker parked and unparked. park_unpark_count: u64, + #[cfg(tokio_unstable)] /// Number of times the worker woke w/o doing work. noop_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks stolen. steal_count: u64, + #[cfg(tokio_unstable)] /// Number of times tasks where stolen. steal_operations: u64, + #[cfg(tokio_unstable)] /// Number of tasks that were polled by the worker. poll_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks polled when the worker entered park. This is used to /// track the noop count. poll_count_on_last_park: u64, + #[cfg(tokio_unstable)] /// Number of tasks that were scheduled locally on this worker. local_schedule_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks moved to the global queue to make space in the local /// queue overflow_count: u64, - /// The total busy duration in nanoseconds. - busy_duration_total: u64, - - /// Instant at which work last resumed (continued after park). - processing_scheduled_tasks_started_at: Instant, - + #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, } -struct PollTimer { - /// Histogram of poll counts within each band. - poll_counts: HistogramBatch, +cfg_unstable_metrics! { + struct PollTimer { + /// Histogram of poll counts within each band. + poll_counts: HistogramBatch, - /// Instant when the most recent task started polling. - poll_started_at: Instant, + /// Instant when the most recent task started polling. + poll_started_at: Instant, + } } impl MetricsBatch { pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { let now = Instant::now(); + Self::new_unstable(worker_metrics, now) + } - MetricsBatch { - park_count: 0, - park_unpark_count: 0, - noop_count: 0, - steal_count: 0, - steal_operations: 0, - poll_count: 0, - poll_count_on_last_park: 0, - local_schedule_count: 0, - overflow_count: 0, - busy_duration_total: 0, - processing_scheduled_tasks_started_at: now, - poll_timer: worker_metrics - .poll_count_histogram - .as_ref() - .map(|worker_poll_counts| PollTimer { - poll_counts: HistogramBatch::from_histogram(worker_poll_counts), - poll_started_at: now, - }), + cfg_metrics_variant! { + stable: { + #[inline(always)] + fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { + MetricsBatch { + busy_duration_total: 0, + processing_scheduled_tasks_started_at: now, + } + } + }, + unstable: { + #[inline(always)] + fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { + MetricsBatch { + park_count: 0, + park_unpark_count: 0, + noop_count: 0, + steal_count: 0, + steal_operations: 0, + poll_count: 0, + poll_count_on_last_park: 0, + local_schedule_count: 0, + overflow_count: 0, + busy_duration_total: 0, + processing_scheduled_tasks_started_at: now, + poll_timer: worker_metrics.poll_count_histogram.as_ref().map( + |worker_poll_counts| PollTimer { + poll_counts: HistogramBatch::from_histogram(worker_poll_counts), + poll_started_at: now, + }, + ), + } + } } } pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { - worker.mean_poll_time.store(mean_poll_time, Relaxed); - worker.park_count.store(self.park_count, Relaxed); - worker - .park_unpark_count - .store(self.park_unpark_count, Relaxed); - worker.noop_count.store(self.noop_count, Relaxed); - worker.steal_count.store(self.steal_count, Relaxed); - worker - .steal_operations - .store(self.steal_operations, Relaxed); - worker.poll_count.store(self.poll_count, Relaxed); - worker .busy_duration_total .store(self.busy_duration_total, Relaxed); - worker - .local_schedule_count - .store(self.local_schedule_count, Relaxed); - worker.overflow_count.store(self.overflow_count, Relaxed); + self.submit_unstable(worker, mean_poll_time); + } - if let Some(poll_timer) = &self.poll_timer { - let dst = worker.poll_count_histogram.as_ref().unwrap(); - poll_timer.poll_counts.submit(dst); + cfg_metrics_variant! { + stable: { + #[inline(always)] + fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {} + }, + unstable: { + #[inline(always)] + fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker.mean_poll_time.store(mean_poll_time, Relaxed); + worker.park_count.store(self.park_count, Relaxed); + worker + .park_unpark_count + .store(self.park_unpark_count, Relaxed); + worker.noop_count.store(self.noop_count, Relaxed); + worker.steal_count.store(self.steal_count, Relaxed); + worker + .steal_operations + .store(self.steal_operations, Relaxed); + worker.poll_count.store(self.poll_count, Relaxed); + + worker + .local_schedule_count + .store(self.local_schedule_count, Relaxed); + worker.overflow_count.store(self.overflow_count, Relaxed); + + if let Some(poll_timer) = &self.poll_timer { + let dst = worker.poll_count_histogram.as_ref().unwrap(); + poll_timer.poll_counts.submit(dst); + } + } } } - /// The worker is about to park. - pub(crate) fn about_to_park(&mut self) { - self.park_count += 1; - self.park_unpark_count += 1; - - if self.poll_count_on_last_park == self.poll_count { - self.noop_count += 1; - } else { - self.poll_count_on_last_park = self.poll_count; + cfg_metrics_variant! { + stable: { + /// The worker is about to park. + pub(crate) fn about_to_park(&mut self) {} + }, + unstable: { + /// The worker is about to park. + pub(crate) fn about_to_park(&mut self) { + #[cfg(tokio_unstable)] + { + self.park_count += 1; + self.park_unpark_count += 1; + + if self.poll_count_on_last_park == self.poll_count { + self.noop_count += 1; + } else { + self.poll_count_on_last_park = self.poll_count; + } + } + } } } - /// The worker was unparked. - pub(crate) fn unparked(&mut self) { - self.park_unpark_count += 1; + cfg_metrics_variant! { + stable: { + /// The worker was unparked. + pub(crate) fn unparked(&mut self) {} + }, + unstable: { + /// The worker was unparked. + pub(crate) fn unparked(&mut self) { + self.park_unpark_count += 1; + } + } } /// Start processing a batch of tasks @@ -133,40 +197,84 @@ impl MetricsBatch { self.busy_duration_total += duration_as_u64(busy_duration); } - /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { - self.poll_count += 1; - - if let Some(poll_timer) = &mut self.poll_timer { - poll_timer.poll_started_at = Instant::now(); + cfg_metrics_variant! { + stable: { + /// Start polling an individual task + pub(crate) fn start_poll(&mut self) {} + }, + unstable: { + /// Start polling an individual task + pub(crate) fn start_poll(&mut self) { + self.poll_count += 1; + if let Some(poll_timer) = &mut self.poll_timer { + poll_timer.poll_started_at = Instant::now(); + } + } } } - /// Stop polling an individual task - pub(crate) fn end_poll(&mut self) { - if let Some(poll_timer) = &mut self.poll_timer { - let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); - poll_timer.poll_counts.measure(elapsed, 1); + cfg_metrics_variant! { + stable: { + /// Stop polling an individual task + pub(crate) fn end_poll(&mut self) {} + }, + unstable: { + /// Stop polling an individual task + pub(crate) fn end_poll(&mut self) { + #[cfg(tokio_unstable)] + if let Some(poll_timer) = &mut self.poll_timer { + let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); + poll_timer.poll_counts.measure(elapsed, 1); + } + } } } - pub(crate) fn inc_local_schedule_count(&mut self) { - self.local_schedule_count += 1; + cfg_metrics_variant! { + stable: { + pub(crate) fn inc_local_schedule_count(&mut self) {} + }, + unstable: { + pub(crate) fn inc_local_schedule_count(&mut self) { + self.local_schedule_count += 1; + } + } } } cfg_rt_multi_thread! { impl MetricsBatch { - pub(crate) fn incr_steal_count(&mut self, by: u16) { - self.steal_count += by as u64; + cfg_metrics_variant! { + stable: { + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + }, + unstable: { + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.steal_count += by as u64; + } + } } - pub(crate) fn incr_steal_operations(&mut self) { - self.steal_operations += 1; + cfg_metrics_variant! { + stable: { + pub(crate) fn incr_steal_operations(&mut self) {} + }, + unstable: { + pub(crate) fn incr_steal_operations(&mut self) { + self.steal_operations += 1; + } + } } - pub(crate) fn incr_overflow_count(&mut self) { - self.overflow_count += 1; + cfg_metrics_variant! { + stable: { + pub(crate) fn incr_overflow_count(&mut self) {} + }, + unstable: { + pub(crate) fn incr_overflow_count(&mut self) { + self.overflow_count += 1; + } + } } } } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 777c13d8a83..6d70b2e1b2d 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -1,13 +1,7 @@ //! This file contains mocks of the types in src/runtime/metrics -use std::thread::ThreadId; - pub(crate) struct SchedulerMetrics {} -pub(crate) struct WorkerMetrics {} - -pub(crate) struct MetricsBatch {} - #[derive(Clone, Default)] pub(crate) struct HistogramBuilder {} @@ -19,41 +13,3 @@ impl SchedulerMetrics { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) {} } - -impl WorkerMetrics { - pub(crate) fn new() -> Self { - Self {} - } - - pub(crate) fn from_config(config: &crate::runtime::Config) -> Self { - // Prevent the dead-code warning from being triggered - let _ = &config.metrics_poll_count_histogram; - Self::new() - } - - pub(crate) fn set_queue_depth(&self, _len: usize) {} - pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {} -} - -impl MetricsBatch { - pub(crate) fn new(_: &WorkerMetrics) -> Self { - Self {} - } - - pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} - pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn unparked(&mut self) {} - pub(crate) fn inc_local_schedule_count(&mut self) {} - pub(crate) fn start_processing_scheduled_tasks(&mut self) {} - pub(crate) fn end_processing_scheduled_tasks(&mut self) {} - pub(crate) fn start_poll(&mut self) {} - pub(crate) fn end_poll(&mut self) {} -} - -cfg_rt_multi_thread! { - impl MetricsBatch { - pub(crate) fn incr_steal_count(&mut self, _by: u16) {} - pub(crate) fn incr_steal_operations(&mut self) {} - pub(crate) fn incr_overflow_count(&mut self) {} - } -} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index c4634a4b1ea..b2772f1842a 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -11,22 +11,23 @@ mod runtime; pub use runtime::RuntimeMetrics; +mod batch; +pub(crate) use batch::MetricsBatch; + +mod worker; +pub(crate) use worker::WorkerMetrics; + cfg_unstable_metrics! { - mod batch; - pub(crate) use batch::MetricsBatch; mod histogram; pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; + #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::{HistogramScale, HistogramConfiguration, LogHistogram, LogHistogramBuilder, InvalidHistogramConfiguration}; - mod scheduler; pub(crate) use scheduler::SchedulerMetrics; - mod worker; - pub(crate) use worker::WorkerMetrics; - cfg_net! { mod io; pub(crate) use io::IoDriverMetrics; @@ -35,6 +36,5 @@ cfg_unstable_metrics! { cfg_not_unstable_metrics! { mod mock; - - pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; + pub(crate) use mock::{SchedulerMetrics, HistogramBuilder}; } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index adf5866a95d..b8f71a95f04 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,12 +1,13 @@ use crate::runtime::Handle; +use std::time::Duration; + +cfg_64bit_metrics! { + use std::sync::atomic::Ordering::Relaxed; +} cfg_unstable_metrics! { use std::ops::Range; use std::thread::ThreadId; - cfg_64bit_metrics! { - use std::sync::atomic::Ordering::Relaxed; - } - use std::time::Duration; } /// Handle to the runtime's metrics. @@ -96,6 +97,54 @@ impl RuntimeMetrics { self.handle.inner.injection_queue_depth() } + cfg_64bit_metrics! { + /// Returns the amount of time the given worker thread has been busy. + /// + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. + /// + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); + /// } + /// ``` + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) + } + } + cfg_unstable_metrics! { /// Returns the number of additional threads spawned by the runtime. @@ -543,52 +592,6 @@ impl RuntimeMetrics { .load(Relaxed) } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); - /// } - /// ``` - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) - } - /// Returns the number of tasks scheduled from **within** the runtime on the /// given worker's local queue. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 29804a08798..2ecd53f6791 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -1,10 +1,13 @@ -use crate::runtime::metrics::Histogram; use crate::runtime::Config; use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize}; use std::sync::atomic::Ordering::Relaxed; use std::sync::Mutex; use std::thread::ThreadId; +cfg_unstable_metrics! { + use crate::runtime::metrics::Histogram; +} + /// Retrieve runtime worker metrics. /// /// **Note**: This is an [unstable API][unstable]. The public API of this type @@ -15,74 +18,95 @@ use std::thread::ThreadId; #[derive(Debug, Default)] #[repr(align(128))] pub(crate) struct WorkerMetrics { + /// Amount of time the worker spent doing work vs. parking. + pub(crate) busy_duration_total: MetricAtomicU64, + + /// Number of tasks currently in the local queue. Used only by the + /// current-thread scheduler. + pub(crate) queue_depth: MetricAtomicUsize, + + /// Thread id of worker thread. + thread_id: Mutex>, + + #[cfg(tokio_unstable)] /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of times the worker parked and unparked. pub(crate) park_unpark_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of tasks the worker stole. pub(crate) steal_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of times the worker stole pub(crate) steal_operations: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of tasks the worker polled. pub(crate) poll_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// EWMA task poll time, in nanoseconds. pub(crate) mean_poll_time: MetricAtomicU64, - /// Amount of time the worker spent doing work vs. parking. - pub(crate) busy_duration_total: MetricAtomicU64, - + #[cfg(tokio_unstable)] /// Number of tasks scheduled for execution on the worker's local queue. pub(crate) local_schedule_count: MetricAtomicU64, + #[cfg(tokio_unstable)] /// Number of tasks moved from the local queue to the global queue to free space. pub(crate) overflow_count: MetricAtomicU64, - /// Number of tasks currently in the local queue. Used only by the - /// current-thread scheduler. - pub(crate) queue_depth: MetricAtomicUsize, - + #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, - - /// Thread id of worker thread. - thread_id: Mutex>, } impl WorkerMetrics { - pub(crate) fn from_config(config: &Config) -> WorkerMetrics { - let mut worker_metrics = WorkerMetrics::new(); - worker_metrics.poll_count_histogram = config - .metrics_poll_count_histogram - .as_ref() - .map(|histogram_builder| histogram_builder.build()); - worker_metrics - } - pub(crate) fn new() -> WorkerMetrics { WorkerMetrics::default() } - pub(crate) fn queue_depth(&self) -> usize { - self.queue_depth.load(Relaxed) - } - pub(crate) fn set_queue_depth(&self, len: usize) { self.queue_depth.store(len, Relaxed); } - pub(crate) fn thread_id(&self) -> Option { - *self.thread_id.lock().unwrap() - } - pub(crate) fn set_thread_id(&self, thread_id: ThreadId) { *self.thread_id.lock().unwrap() = Some(thread_id); } + + cfg_metrics_variant! { + stable: { + pub(crate) fn from_config(_: &Config) -> WorkerMetrics { + WorkerMetrics::new() + } + }, + unstable: { + pub(crate) fn from_config(config: &Config) -> WorkerMetrics { + let mut worker_metrics = WorkerMetrics::new(); + worker_metrics.poll_count_histogram = config + .metrics_poll_count_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); + worker_metrics + } + } + } + + cfg_unstable_metrics! { + pub(crate) fn queue_depth(&self) -> usize { + self.queue_depth.load(Relaxed) + } + + pub(crate) fn thread_id(&self) -> Option { + *self.thread_id.lock().unwrap() + } + } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 37f37a4e9e3..112374916c3 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -570,6 +570,11 @@ impl Handle { pub(crate) fn injection_queue_depth(&self) -> usize { self.shared.inject.len() } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + assert_eq!(0, worker); + &self.shared.worker_metrics + } } cfg_unstable_metrics! { @@ -578,11 +583,6 @@ cfg_unstable_metrics! { &self.shared.scheduler_metrics } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - assert_eq!(0, worker); - &self.shared.worker_metrics - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.worker_metrics(worker).queue_depth() } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index e0a1b20b5bc..3d19300a61a 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -9,6 +9,8 @@ cfg_rt! { pub(crate) use inject::Inject; use crate::runtime::TaskHooks; + + use crate::runtime::WorkerMetrics; } cfg_rt_multi_thread! { @@ -236,10 +238,14 @@ cfg_rt! { pub(crate) fn injection_queue_depth(&self) -> usize { match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) + } } cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + use crate::runtime::SchedulerMetrics; impl Handle { cfg_64bit_metrics! { @@ -260,10 +266,6 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker)) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 21795dbbc2f..985495c7561 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,7 +1,8 @@ use super::Handle; +use crate::runtime::WorkerMetrics; cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + use crate::runtime::SchedulerMetrics; } impl Handle { @@ -17,6 +18,10 @@ impl Handle { self.shared.injection_queue_depth() } + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } + cfg_unstable_metrics! { cfg_64bit_metrics! { pub(crate) fn spawned_tasks_count(&self) -> u64 { @@ -39,10 +44,6 @@ impl Handle { &self.shared.scheduler_metrics } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.shared.worker_local_queue_depth(worker) } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index ad3b0e367e0..cf0900c1e6f 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -82,6 +82,49 @@ fn global_queue_depth_multi_thread() { panic!("exhausted every try to block the runtime"); } +#[test] +fn worker_total_busy_duration() { + const N: usize = 5; + + let zero = Duration::from_millis(0); + + let rt = current_thread(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + assert!(zero < metrics.worker_total_busy_duration(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + for i in 0..metrics.num_workers() { + assert!(zero < metrics.worker_total_busy_duration(i)); + } +} + fn try_block_threaded(rt: &Runtime) -> Result>, mpsc::RecvTimeoutError> { let (tx, rx) = mpsc::channel(); diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 60cdc525ff1..05486e56b6c 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -593,49 +593,6 @@ fn worker_poll_count_histogram_disabled_without_explicit_enable() { } } -#[test] -fn worker_total_busy_duration() { - const N: usize = 5; - - let zero = Duration::from_millis(0); - - let rt = current_thread(); - let metrics = rt.metrics(); - - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); - } - }); - - drop(rt); - - assert!(zero < metrics.worker_total_busy_duration(0)); - - let rt = threaded(); - let metrics = rt.metrics(); - - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); - } - }); - - drop(rt); - - for i in 0..metrics.num_workers() { - assert!(zero < metrics.worker_total_busy_duration(i)); - } -} - #[test] fn worker_local_schedule_count() { let rt = current_thread();