From 829904eed7a42f72d7b1a951effde436b68f2b4c Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Tue, 7 May 2024 10:57:10 -0700 Subject: [PATCH] Add OriginContext to track data across modules (#875) Adds OriginContext that can have data set on it about the origin and read out in any module at any time. This is designed to allow us to track "requests" (usually network requests) as it traverses through the system. The most immediate use is to track what hashing algorithm was requested for VerifyStore and verify the data using it. In the future we'll likely use this for other things, like tracking a "user" and emit what resources that user used over time and emit it to another system based on a config. --- .gitignore | 1 + Cargo.lock | 1 + nativelink-macro/src/lib.rs | 9 +- nativelink-service/src/health_server.rs | 7 +- .../tests/filesystem_store_test.rs | 11 +- nativelink-util/BUILD.bazel | 2 + nativelink-util/Cargo.toml | 1 + nativelink-util/src/lib.rs | 4 + nativelink-util/src/origin_context.rs | 366 ++++++++++++++++++ nativelink-util/src/task.rs | 57 ++- src/bin/nativelink.rs | 181 +++++---- 11 files changed, 554 insertions(+), 86 deletions(-) create mode 100644 nativelink-util/src/origin_context.rs diff --git a/.gitignore b/.gitignore index ec1bac05a..8cd9245cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ bazel-* target/ +.vscode/ .cache .terraform* .config diff --git a/Cargo.lock b/Cargo.lock index 89185c9a7..06e6f306c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1922,6 +1922,7 @@ dependencies = [ "bytes", "futures", "hex", + "hyper", "lru", "mock_instant", "nativelink-config", diff --git a/nativelink-macro/src/lib.rs b/nativelink-macro/src/lib.rs index a9527d868..0883fc15d 100644 --- a/nativelink-macro/src/lib.rs +++ b/nativelink-macro/src/lib.rs @@ -35,9 +35,12 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream { #[tokio::test(#attr)] async fn #fn_name(#fn_inputs) #fn_output { #[warn(clippy::disallowed_methods)] - { - #fn_block - } + ::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async( + ::nativelink_util::__tracing::trace_span!("test"), async move { + #fn_block + } + ) + .await } }; diff --git a/nativelink-service/src/health_server.rs b/nativelink-service/src/health_server.rs index dfce9d18e..c84e0ad19 100644 --- a/nativelink-service/src/health_server.rs +++ b/nativelink-service/src/health_server.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use futures::StreamExt; @@ -22,7 +23,7 @@ use hyper::{Body, Request, Response, StatusCode}; use nativelink_util::health_utils::{ HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter, }; -use nativelink_util::task::instrument_future; +use nativelink_util::origin_context::OriginContext; use tower::Service; use tracing::error_span; @@ -51,7 +52,8 @@ impl Service> for HealthServer { fn call(&mut self, _req: Request) -> Self::Future { let health_registry = self.health_registry.clone(); - Box::pin(instrument_future( + Box::pin(Arc::new(OriginContext::new()).wrap_async( + error_span!("health_server_call"), async move { let health_status_descriptions: Vec = health_registry.health_status_report().collect().await; @@ -81,7 +83,6 @@ impl Service> for HealthServer { .unwrap()), } }, - error_span!("health_server_call"), )) } } diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 95ab9b753..9209c4ac8 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -40,8 +40,8 @@ use nativelink_store::filesystem_store::{ use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::LenEntry; +use nativelink_util::origin_context::ContextAwareFuture; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::task::instrument_future; use nativelink_util::{background_spawn, spawn}; use once_cell::sync::Lazy; use rand::{thread_rng, Rng}; @@ -51,6 +51,7 @@ use tokio::sync::Barrier; use tokio::time::sleep; use tokio_stream::wrappers::ReadDirStream; use tokio_stream::StreamExt; +use tracing::Instrument; trait FileEntryHooks { fn on_unref(_entry: &Fe) {} @@ -167,7 +168,7 @@ impl Drop for TestFileEntry Drop for TestFileEntry 0 { tokio::task::yield_now().await; } - }, - tracing::error_span!("test_file_entry_drop"), + } + .instrument(tracing::error_span!("test_file_entry_drop")), ); #[allow(clippy::disallowed_methods)] let thread_handle = { @@ -184,7 +185,7 @@ impl Drop for TestFileEntry { + #[no_mangle] + #[used] + pub static $name: $crate::origin_context::NLSymbol<$type> = + $crate::origin_context::NLSymbol { + name: concat!(module_path!(), "::", stringify!($name)), + _phantom: std::marker::PhantomData {}, + }; + }; +} + +pub struct NLSymbol { + pub name: &'static str, + pub _phantom: std::marker::PhantomData, +} + +impl Symbol for NLSymbol { + type Type = T; + + fn name(&self) -> &'static str { + self.name + } +} + +pub type RawSymbol = std::os::raw::c_void; + +pub trait Symbol { + type Type: 'static; + + fn name(&self) -> &'static str { + std::any::type_name::() + } + + fn as_ptr(&'static self) -> *const RawSymbol { + self as *const Self as *const RawSymbol + } +} + +/// Simple wrapper around a raw symbol pointer. +/// This allows us to bypass the unsafe undefined behavior check +/// when using raw pointers by manually implementing Send and Sync. +#[derive(Eq, PartialEq, Hash, Clone)] +#[repr(transparent)] +pub struct RawSymbolWrapper(pub *const RawSymbol); + +unsafe impl Send for RawSymbolWrapper {} +unsafe impl Sync for RawSymbolWrapper {} + +/// Context used to store data about the origin of a request. +#[derive(Default, Clone)] +pub struct OriginContext { + data: HashMap>, +} + +impl OriginContext { + /// Creates a new (empty) context. + pub fn new() -> Self { + Self::default() + } + + /// Sets the value for a given symbol on the context. + pub fn set_value( + &mut self, + symbol: &'static impl Symbol, + value: Arc, + ) -> Option> { + self.data.insert(RawSymbolWrapper(symbol.as_ptr()), value) + } + + /// Gets the value current set for a given symbol on the context. + #[inline] + pub fn get_value( + &self, + symbol: &'static impl Symbol, + ) -> Result>, Error> { + self.data + .get(&RawSymbolWrapper(symbol.as_ptr())) + .map_or(Ok(None), |value| { + Arc::downcast::(value.clone()).map_or_else( + |_| { + Err(make_err!( + Code::Internal, + "Failed to downcast symbol: {}", + symbol.name(), + )) + }, + |v| Ok(Some(v)), + ) + }) + } + + /// Consumes the context and runs the given function with the context set + /// as the active context. When the function exists the context is restored + /// to the previous global context. + #[inline] + pub fn run(self, span: Span, func: impl FnOnce() -> T) -> T { + Arc::new(self).wrap(span, func)() + } + + /// Wraps a function so when it is called the passed in context is set as + /// the active context and when the function exists the context is restored + /// to the previous global context. + #[inline] + fn wrap(self: Arc, span: Span, func: impl FnOnce() -> T) -> impl FnOnce() -> T { + move || { + let enter = Self::enter(self); + let result = span.in_scope(func); + enter.release(); + result + } + } + + /// Wraps a future so when it is called the passed in context is set as + /// the active context and when the future exists the context is restored + /// to the previous global context. + #[inline] + pub fn wrap_async( + self: Arc, + span: Span, + fut: impl Future, + ) -> impl Future { + ContextAwareFuture::new(Some(self), fut.instrument(span)) + } + + /// Spawns a future in the background with the given context. + pub fn background_spawn( + self: Arc, + span: Span, + fut: impl Future + Send + 'static, + ) { + background_spawn!(span: span, ctx: Some(self), fut: fut); + } + + /// Enters the context, storing the previous context. If the returned + /// `ContextDropGuard` is dropped, the previous context is restored. + #[inline] + fn enter(self: Arc) -> ContextDropGuard { + ContextDropGuard::new(self) + } +} + +/// Static struct to interact with the active global context. +pub struct ActiveOriginContext; + +impl ActiveOriginContext { + /// Sets the active context for the current thread. + pub fn get() -> Option> { + GLOBAL_ORIGIN_CONTEXT.with_borrow(|maybe_ctx| maybe_ctx.clone()) + } + + /// Gets the value current set for a given symbol on the + /// active context. + #[inline] + pub fn get_value( + symbol: &'static impl Symbol, + ) -> Result>, Error> { + GLOBAL_ORIGIN_CONTEXT.with_borrow(|maybe_ctx| { + maybe_ctx.as_ref().map_or_else( + || { + Err(make_err!( + Code::Internal, + "Expected active context to be set" + )) + }, + |ctx| ctx.get_value(symbol), + ) + }) + } + + /// Forks the active context, returning a new context with the same data. + #[inline] + pub fn fork() -> Result { + GLOBAL_ORIGIN_CONTEXT.with_borrow(|maybe_ctx| { + maybe_ctx.as_ref().map_or_else( + || { + Err(make_err!( + Code::Internal, + "Expected active context to be set" + )) + }, + |ctx| Ok(ctx.as_ref().clone()), + ) + }) + } +} + +thread_local! { + /// Global context that is used to store the active context. + static GLOBAL_ORIGIN_CONTEXT: RefCell>> = const { RefCell::new(None) }; +} + +/// Special guard struct that is used to hold the previous context and restore it +/// when the guard is released or dropped. +struct ContextDropGuard { + prev_ctx: Option>, + new_ctx_ptr: *const OriginContext, +} + +impl ContextDropGuard { + /// Places the new context in the global context, stores the previous context + /// and returns a new `ContextDropGuard`, so if it is ever released or dropped + /// the previous context will be restored. + #[inline] + fn new(new_ctx: Arc) -> Self { + let new_ctx_ptr = Arc::as_ptr(&new_ctx); + let prev_ctx = GLOBAL_ORIGIN_CONTEXT.replace(Some(new_ctx)); + Self { + prev_ctx, + new_ctx_ptr, + } + } + + /// Swap the global context with the previous context. + /// Returns the context used in the `.enter()`/`.new()` call. + fn restore_global_context(&mut self) -> Arc { + let new_ctx = GLOBAL_ORIGIN_CONTEXT + .replace(self.prev_ctx.take()) + .expect("Expected global context to be set"); + assert_eq!(self.new_ctx_ptr, Arc::as_ptr(&new_ctx)); + new_ctx + } + + /// Release the context guard and restore the previous context. + /// This is an optimization, so we don't need to clone our Arc + /// if the caller can use the context after releasing it. + #[inline] + fn release(mut self) -> Arc { + let new_ctx = self.restore_global_context(); + std::mem::forget(self); // Prevent the destructor from being called. + new_ctx + } +} + +impl Drop for ContextDropGuard { + #[inline] + fn drop(&mut self) { + // If the user calls `.release()` the drop() will not be triggered. + // If a panic happens, the drop() will be called and the `.release()` + // will not be called, this is our safety net. + self.restore_global_context(); + } +} + +pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ContextAwareFuture { + // `ManuallyDrop` is used so we can call `self.span.enter()` in the `drop()` + // of our inner future, then drop the span. + #[pin] + inner: ManuallyDrop>, + context: Option>, + } + + impl PinnedDrop for ContextAwareFuture { + #[inline] + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + // Note: If the future panics, the context will not be restored, so + // this is a best effort to provide access to our global context + // in the desturctors the event of a panic. + let _enter = this.context.take().map(|ctx| ctx.enter()); + // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't + // different from wrapping `T` in `Option` and calling + // `Pin::set(&mut this.inner, None)`, except avoiding + // additional memory overhead. + // 2. `ManuallyDrop::drop()` is safe, because + // `PinnedDrop::drop()` is guaranteed to be called only + // once. + unsafe { + ManuallyDrop::drop(this.inner.get_unchecked_mut()) + } + } + } +} + +impl ContextAwareFuture { + /// Utility function to create a new `ContextAwareFuture` from the + /// active context. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[inline] + pub fn new_from_active(inner: Instrumented) -> ContextAwareFuture { + match ActiveOriginContext::get() { + Some(ctx) => ContextAwareFuture::new(Some(ctx), inner), + None => { + // Useful to get tracing stack trace. + tracing::error!("OriginContext must be set"); + panic!("OriginContext must be set"); + } + } + } + + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[inline] + pub(crate) fn new(context: Option>, inner: Instrumented) -> Self { + Self { + inner: ManuallyDrop::new(inner), + context, + } + } +} + +impl Future for ContextAwareFuture { + type Output = T::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let Some(ctx) = this.context.take() else { + // Useful to get tracing stack trace. + tracing::error!("Expected context to be set"); + panic!("Expected context to be set"); + }; + let enter = ctx.enter(); + // Since `inner` is only moved when the future itself is dropped, `inner` will + // never move, so this should be safe. + // see: https://docs.rs/tracing/0.1.40/src/tracing/instrument.rs.html#297 + let inner = unsafe { this.inner.map_unchecked_mut(|v| &mut **v) }; + let result = inner.poll(cx); + assert!( + this.context.replace(enter.release()).is_none(), + "Expected context to be unset" + ); + result + } +} diff --git a/nativelink-util/src/task.rs b/nativelink-util/src/task.rs index 91deb5f03..d1f6a0c5f 100644 --- a/nativelink-util/src/task.rs +++ b/nativelink-util/src/task.rs @@ -13,19 +13,31 @@ // limitations under the License. use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use futures::Future; +use hyper::rt::Executor; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::{spawn_blocking, JoinError, JoinHandle}; pub use tracing::error_span as __error_span; -use tracing::{Instrument, Span}; +use tracing::instrument::Instrumented; +use tracing::{event, info_span, Instrument, Level, Span}; + +use crate::origin_context::{ActiveOriginContext, ContextAwareFuture, OriginContext}; #[inline(always)] -pub fn instrument_future(f: F, span: Span) -> impl Future +pub fn __spawn_with_span_and_context( + f: F, + span: Span, + ctx: Option>, +) -> JoinHandle where + T: Send + 'static, F: Future + Send + 'static, { - f.instrument(span) + #[allow(clippy::disallowed_methods)] + tokio::spawn(ContextAwareFuture::new(ctx, f.instrument(span))) } #[inline(always)] @@ -34,8 +46,7 @@ where T: Send + 'static, F: Future + Send + 'static, { - #[allow(clippy::disallowed_methods)] - tokio::spawn(instrument_future(f, span)) + __spawn_with_span_and_context(f, span, ActiveOriginContext::get()) } #[inline(always)] @@ -59,6 +70,9 @@ macro_rules! background_spawn { (name: $name:expr, fut: $fut:expr, target: $target:expr, $($fields:tt)*) => {{ $crate::task::__spawn_with_span($fut, $crate::task::__error_span!(target: $target, $name, $($fields)*)) }}; + (span: $span:expr, ctx: $ctx:expr, fut: $fut:expr) => {{ + $crate::task::__spawn_with_span_and_context($fut, $span, $ctx) + }}; } #[macro_export] @@ -117,3 +131,36 @@ impl Drop for JoinHandleDropGuard { self.inner.abort(); } } + +pub struct TaskExecutor(UnboundedSender>); + +impl Clone for TaskExecutor { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl TaskExecutor { + pub fn new(tx: UnboundedSender>) -> Self { + Self(tx) + } +} + +impl Executor for TaskExecutor +where + Self: Send + Sync, + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, fut: F) { + let _ = self + .0 + .send(fut.instrument(info_span!("http_executor"))) + .inspect_err(|_| { + event!( + Level::ERROR, + "Could not dispatch future from TaskExecutor(hyper) to parent spawn." + ); + }); + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index a3f745bcd..53b77e365 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,8 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use mimalloc::MiMalloc; @@ -48,9 +49,11 @@ use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; +use nativelink_util::origin_context::OriginContext; use nativelink_util::store_trait::{ set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG, }; +use nativelink_util::task::TaskExecutor; use nativelink_util::{background_spawn, spawn, spawn_blocking}; use nativelink_worker::local_worker::new_local_worker; use parking_lot::Mutex; @@ -66,7 +69,7 @@ use tokio_rustls::TlsAcceptor; use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; use tower::util::ServiceExt; -use tracing::{event, Level}; +use tracing::{error_span, event, trace_span, Level}; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; #[global_allocator] @@ -424,39 +427,44 @@ async fn inner_main( }; svc = svc.route_service( path, - axum::routing::get(move |_request: hyper::Request| async move { - // We spawn on a thread that can block to give more freedom to our metrics - // collection. This allows it to call functions like `tokio::block_in_place` - // if it needs to wait on a future. - spawn_blocking!("prometheus_metrics", move || { - let mut buf = String::new(); - let root_metrics_registry_guard = - futures::executor::block_on(root_metrics_registry.lock()); - prometheus_client::encoding::text::encode( - &mut buf, - &root_metrics_registry_guard, - ) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - .map(|_| { - // This is a hack to get around this bug: https://github.com/prometheus/client_rust/issues/155 - buf = buf.replace("nativelink_nativelink_stores_", ""); - buf = buf.replace("nativelink_nativelink_workers_", ""); - let mut response = Response::new(buf); - // Per spec we should probably use `application/openmetrics-text; version=1.0.0; charset=utf-8` - // https://github.com/OpenObservability/OpenMetrics/blob/1386544931307dff279688f332890c31b6c5de36/specification/OpenMetrics.md#overall-structure - // However, this makes debugging more difficult, so we use the old text/plain instead. - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - hyper::header::HeaderValue::from_static( - "text/plain; version=0.0.4; charset=utf-8", - ), - ); - response - }) - .unwrap_or_else(error_to_response) - }) - .await - .unwrap_or_else(error_to_response) + axum::routing::get(move |_request: hyper::Request| { + Arc::new(OriginContext::new()).wrap_async( + trace_span!("prometheus_ctx"), + async move { + // We spawn on a thread that can block to give more freedom to our metrics + // collection. This allows it to call functions like `tokio::block_in_place` + // if it needs to wait on a future. + spawn_blocking!("prometheus_metrics", move || { + let mut buf = String::new(); + let root_metrics_registry_guard = + futures::executor::block_on(root_metrics_registry.lock()); + prometheus_client::encoding::text::encode( + &mut buf, + &root_metrics_registry_guard, + ) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map(|_| { + // This is a hack to get around this bug: https://github.com/prometheus/client_rust/issues/155 + buf = buf.replace("nativelink_nativelink_stores_", ""); + buf = buf.replace("nativelink_nativelink_workers_", ""); + let mut response = Response::new(buf); + // Per spec we should probably use `application/openmetrics-text; version=1.0.0; charset=utf-8` + // https://github.com/OpenObservability/OpenMetrics/blob/1386544931307dff279688f332890c31b6c5de36/specification/OpenMetrics.md#overall-structure + // However, this makes debugging more difficult, so we use the old text/plain instead. + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static( + "text/plain; version=0.0.4; charset=utf-8", + ), + ); + response + }) + .unwrap_or_else(error_to_response) + }) + .await + .unwrap_or_else(error_to_response) + }, + ) }), ) } @@ -678,45 +686,73 @@ async fn inner_main( } }, ); - let (http, svc) = (http.clone(), svc.clone()); - let fut = if let Some(tls_acceptor) = &maybe_tls_acceptor { - let tls_stream = match tls_acceptor.accept(tcp_stream).await { - Ok(result) => result, - Err(err) => { - event!(Level::ERROR, ?err, "Failed to accept tls stream"); - continue; - } - }; - http.serve_connection(tls_stream, svc).left_future() - } else { - http.serve_connection(tcp_stream, svc).right_future() - } - .map_ok_or_else( - |err| { - use std::error::Error; - if let Some(inner_err) = err.source() { - if let Some(io_err) = inner_err.downcast_ref::() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return Ok(()); - } - } - } - Err(err) - }, - Ok, + let (http, svc, maybe_tls_acceptor) = + (http.clone(), svc.clone(), maybe_tls_acceptor.clone()); + Arc::new(OriginContext::new()).background_spawn( + error_span!( + target: "nativelink::services", + "http_connection", + ?remote_addr, + ?socket_addr + ), + async move {}, ); background_spawn!( name: "http_connection", fut: async move { // Move it into our spawn, so if our spawn dies the cleanup happens. let _guard = scope_guard; - if let Err(err) = fut.await { - event!( - target: "nativelink::services", - Level::ERROR, - ?err, - "Failed running service" - ); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let http = http.with_executor(TaskExecutor::new(tx)); + let mut http_svc_fut = if let Some(tls_acceptor) = maybe_tls_acceptor { + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(result) => result, + Err(err) => { + event!(Level::ERROR, ?err, "Failed to accept tls stream"); + return; + } + }; + http.serve_connection(tls_stream, svc).left_future() + } else { + http.serve_connection(tcp_stream, svc).right_future() + }; + let mut futures = FuturesUnordered::new(); + futures.push(futures::future::pending().right_future()); + loop { + tokio::select! { + maybe_new_future = rx.recv() => { + maybe_new_future.map(|fut| futures.push(fut.left_future())).unwrap_or_else(|| { + event!( + target: "nativelink::services", + Level::DEBUG, + ?remote_addr, + "Dropped new_future_receiver", + ) + }); + }, + result = &mut http_svc_fut => { + if let Err(err) = result.map_err(|err| { + use std::error::Error; + if let Some(inner_err) = err.source() { + if let Some(io_err) = inner_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return Ok(()); + } + } + } + Err(err) + }) { + event!( + target: "nativelink::services", + Level::ERROR, + ?err, + "Failed running service" + ); + } + return; // Once the service is done, we don't have any more work to do. + }, + _ = futures.next() => { /* This just pulls a pool of futures. */ }, + }; } }, target: "nativelink::services", @@ -792,7 +828,9 @@ async fn inner_main( let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name); local_worker.register_metrics(worker_metrics); worker_names.insert(name.clone()); - spawn!("worker", local_worker.run(), ?name) + let fut = Arc::new(OriginContext::new()) + .wrap_async(trace_span!("worker_ctx"), local_worker.run()); + spawn!("worker", fut, ?name) } }; root_futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v))); @@ -923,6 +961,9 @@ fn main() -> Result<(), Box> { std::process::exit(143); }); - runtime.block_on(inner_main(cfg, server_start_time)) + runtime.block_on( + Arc::new(OriginContext::new()) + .wrap_async(trace_span!("main"), inner_main(cfg, server_start_time)), + ) } }