Skip to content

Commit

Permalink
Split lookup data hashtable 16 ways
Browse files Browse the repository at this point in the history
The goal here is to allow for more concurrency when inserting data by
breaking up the single `Mutex` that guards the `DataBuilder` by sharding
it S ways, each with its own `Mutex`.

The method chosen here is intentionally not particulary clever and just
distributes entries among the 16 hashmaps based on their hash. This has
the benefit of making lookups fairly simple, at the cost of us not being
able to fully exploit knowledge we have about our data layout (e.g. how
many shards we have in input and we know the input is sorted).

Downside is obviously one extra layer of indirection to go through when
doing lookups.

The constant of 16 has been chosen somewhat arbitrarily: I think it
should correlate well with how many CPUs we have available. In any case,
bumping it up to 128 made performance far worse.

Testing with our 100G dataset suggests that if S = 1, it'll take 40-45
minutes to upload all data; with S = 16, it'll take 19-25 minutes.

Bug: 337062283
Change-Id: I55ac133f2587df93c73b41748738252078eb0131
  • Loading branch information
andrisaar committed May 7, 2024
1 parent 3acd7b5 commit a27a379
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 86 deletions.
6 changes: 3 additions & 3 deletions oak_functions_containers_app/src/native_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tempfile::{tempdir, TempDir};
struct RequestContext {
request: Vec<u8>,
response: Vec<u8>,
lookup_data: LookupData,
lookup_data: LookupData<16>,
}

thread_local! {
Expand Down Expand Up @@ -129,7 +129,7 @@ struct SharedLibrary {
/// Variant of a Handler that dynamically loads a `.so` file and invokes native
/// code to handle requests from there.
pub struct NativeHandler {
lookup_data_manager: Arc<LookupDataManager>,
lookup_data_manager: Arc<LookupDataManager<16>>,

#[allow(dead_code)]
observer: Option<Arc<dyn Observer + Send + Sync>>,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl Handler for NativeHandler {
fn new_handler(
_config: (),
module_bytes: &[u8],
lookup_data_manager: Arc<LookupDataManager>,
lookup_data_manager: Arc<LookupDataManager<16>>,
observer: Option<Arc<dyn Observer + Send + Sync>>,
) -> anyhow::Result<NativeHandler> {
let directory = tempdir().context("could not create temporary directory")?;
Expand Down
2 changes: 1 addition & 1 deletion oak_functions_containers_app/tests/native_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_native_handler() {
.expect("failed to read test library");

let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::new_empty(logger));
let lookup_data_manager = Arc::new(LookupDataManager::<1>::new_empty(logger));
lookup_data_manager
.extend_next_lookup_data([("key_0".as_bytes(), "value_0".as_bytes())].into_iter());

Expand Down
27 changes: 17 additions & 10 deletions oak_functions_sdk/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ lazy_static! {
#[tokio::test]
async fn test_read_write() {
let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -66,7 +67,8 @@ async fn test_read_write() {
#[tokio::test]
async fn test_double_read() {
let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -81,7 +83,8 @@ async fn test_double_read() {
#[tokio::test]
async fn test_double_write() {
let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -96,7 +99,8 @@ async fn test_double_write() {
#[tokio::test]
async fn test_write_log() {
let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -113,7 +117,7 @@ async fn test_storage_get_item() {
let entries = Vec::from_iter([(b"StorageGet".to_vec(), b"StorageGetResponse".to_vec())]);

let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(entries, logger.clone()));
let lookup_data_manager = Arc::new(LookupDataManager::<1>::for_test(entries, logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -131,7 +135,7 @@ async fn test_storage_get_item_not_found() {
let entries = Vec::default();

let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(entries, logger.clone()));
let lookup_data_manager = Arc::new(LookupDataManager::<1>::for_test(entries, logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -150,7 +154,7 @@ async fn test_storage_get_item_huge_key() {
let entries = Vec::from_iter([(bytes.clone(), bytes.clone())]);

let logger = Arc::new(StandaloneLogger);
let lookup_data_manager = Arc::new(LookupDataManager::for_test(entries, logger.clone()));
let lookup_data_manager = Arc::new(LookupDataManager::<1>::for_test(entries, logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -168,7 +172,8 @@ async fn test_echo() {
let logger = Arc::new(StandaloneLogger);
let message_to_echo = "ECHO";

let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -189,7 +194,8 @@ async fn test_blackhole() {
let logger = Arc::new(StandaloneLogger);
let message_to_blackhole = "BLACKHOLE";

let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand All @@ -210,7 +216,8 @@ async fn test_huge_response() {

let logger = Arc::new(StandaloneLogger);

let lookup_data_manager = Arc::new(LookupDataManager::for_test(Vec::default(), logger.clone()));
let lookup_data_manager =
Arc::new(LookupDataManager::<1>::for_test(Vec::default(), logger.clone()));
let api_factory = StdWasmApiFactory { lookup_data_manager };

let wasm_handler =
Expand Down
2 changes: 1 addition & 1 deletion oak_functions_service/benches/wasm_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ fn create_test_data(start: i32, end: i32) -> HashMap<Vec<u8>, Vec<u8>> {

struct TestState<H: Handler> {
wasm_handler: H::HandlerType,
lookup_data_manager: Arc<LookupDataManager>,
lookup_data_manager: Arc<LookupDataManager<16>>,
}

fn create_test_state_with_wasm_module_name<H: Handler>(wasm_module_name: &str) -> TestState<H> {
Expand Down
2 changes: 1 addition & 1 deletion oak_functions_service/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
};

pub struct OakFunctionsInstance<H: Handler> {
lookup_data_manager: Arc<LookupDataManager>,
lookup_data_manager: Arc<LookupDataManager<16>>,
wasm_handler: H::HandlerType,
}

Expand Down
2 changes: 1 addition & 1 deletion oak_functions_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait Handler {
fn new_handler(
config: Self::HandlerConfig,
wasm_module_bytes: &[u8],
lookup_data_manager: Arc<LookupDataManager>,
lookup_data_manager: Arc<LookupDataManager<16>>,
observer: Option<Arc<dyn Observer + Send + Sync>>,
) -> anyhow::Result<Self::HandlerType>;

Expand Down
Loading

0 comments on commit a27a379

Please sign in to comment.