Skip to content

Commit

Permalink
Add OriginContext to track data across modules (#875)
Browse files Browse the repository at this point in the history
Adds OriginContext that can have data set on it about the origin
and read out in any module at any time. This is designed to allow
us to track "requests" (usually network requests) as it traverses
through the system.

The most immediate use is to track what hashing algorithm was
requested for VerifyStore and verify the data using it.

In the future we'll likely use this for other things, like tracking
a "user" and emit what resources that user used over time and emit
it to another system based on a config.
  • Loading branch information
allada authored May 7, 2024
1 parent 185eab3 commit 829904e
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 86 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
bazel-*
target/
.vscode/
.cache
.terraform*
.config
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions nativelink-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream {
#[tokio::test(#attr)]
async fn #fn_name(#fn_inputs) #fn_output {
#[warn(clippy::disallowed_methods)]
{
#fn_block
}
::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async(
::nativelink_util::__tracing::trace_span!("test"), async move {
#fn_block
}
)
.await
}
};

Expand Down
7 changes: 4 additions & 3 deletions nativelink-service/src/health_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::StreamExt;
Expand All @@ -22,7 +23,7 @@ use hyper::{Body, Request, Response, StatusCode};
use nativelink_util::health_utils::{
HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};
use nativelink_util::task::instrument_future;
use nativelink_util::origin_context::OriginContext;
use tower::Service;
use tracing::error_span;

Expand Down Expand Up @@ -51,7 +52,8 @@ impl Service<Request<hyper::Body>> for HealthServer {

fn call(&mut self, _req: Request<Body>) -> Self::Future {
let health_registry = self.health_registry.clone();
Box::pin(instrument_future(
Box::pin(Arc::new(OriginContext::new()).wrap_async(
error_span!("health_server_call"),
async move {
let health_status_descriptions: Vec<HealthStatusDescription> =
health_registry.health_status_report().collect().await;
Expand Down Expand Up @@ -81,7 +83,6 @@ impl Service<Request<hyper::Body>> for HealthServer {
.unwrap()),
}
},
error_span!("health_server_call"),
))
}
}
11 changes: 6 additions & 5 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use nativelink_store::filesystem_store::{
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::LenEntry;
use nativelink_util::origin_context::ContextAwareFuture;
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::task::instrument_future;
use nativelink_util::{background_spawn, spawn};
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
Expand All @@ -51,6 +51,7 @@ use tokio::sync::Barrier;
use tokio::time::sleep;
use tokio_stream::wrappers::ReadDirStream;
use tokio_stream::StreamExt;
use tracing::Instrument;

trait FileEntryHooks {
fn on_unref<Fe: FileEntry>(_entry: &Fe) {}
Expand Down Expand Up @@ -167,24 +168,24 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> Drop for TestFileEntry<Hooks
// command that will wait for all tasks and sub spawns to complete.
// Sadly we need to rely on `active_drop_spawns` to hit zero to ensure that
// all tasks have completed.
let fut = instrument_future(
let fut = ContextAwareFuture::new_from_active(
async move {
// Drop the FileEntryImpl in a controlled setting then wait for the
// `active_drop_spawns` to hit zero.
drop(inner);
while shared_context.active_drop_spawns.load(Ordering::Acquire) > 0 {
tokio::task::yield_now().await;
}
},
tracing::error_span!("test_file_entry_drop"),
}
.instrument(tracing::error_span!("test_file_entry_drop")),
);
#[allow(clippy::disallowed_methods)]
let thread_handle = {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(fut)
rt.block_on(fut);
})
};
thread_handle.join().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rust_library(
"src/health_utils.rs",
"src/lib.rs",
"src/metrics_utils.rs",
"src/origin_context.rs",
"src/platform_properties.rs",
"src/proto_stream_utils.rs",
"src/resource_info.rs",
Expand All @@ -42,6 +43,7 @@ rust_library(
"@crates//:bytes",
"@crates//:futures",
"@crates//:hex",
"@crates//:hyper",
"@crates//:lru",
"@crates//:parking_lot",
"@crates//:pin-project-lite",
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ blake3 = { version = "1.5.1", features = ["mmap"] }
bytes = "1.6.0"
futures = "0.3.30"
hex = "0.4.3"
hyper = "0.14.28"
lru = "0.12.3"
parking_lot = "0.12.2"
pin-project-lite = "0.2.14"
Expand Down
4 changes: 4 additions & 0 deletions nativelink-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod fastcdc;
pub mod fs;
pub mod health_utils;
pub mod metrics_utils;
pub mod origin_context;
pub mod platform_properties;
pub mod proto_stream_utils;
pub mod resource_info;
Expand All @@ -30,3 +31,6 @@ pub mod store_trait;
pub mod task;
pub mod tls_utils;
pub mod write_counter;

// Re-export tracing mostly for use in macros.
pub use tracing as __tracing;
Loading

0 comments on commit 829904e

Please sign in to comment.