From 324df369587b3d5cf90d4cd7abd6f88f4400732f Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sat, 28 Dec 2024 19:28:55 -0500 Subject: [PATCH 01/11] disallow name overlaps for stores --- nativelink-store/src/store_manager.rs | 15 +++++++++++++-- rust-toolchain.toml | 2 ++ src/bin/nativelink.rs | 3 ++- 3 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 rust-toolchain.toml diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index 32efda709..ec99b7bb1 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use nativelink_error::{make_err, Code, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::store_trait::Store; use parking_lot::RwLock; @@ -31,9 +32,19 @@ impl StoreManager { } } - pub fn add_store(&self, name: &str, store: Store) { + pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { let mut stores = self.stores.write(); - stores.insert(name.to_string(), store); + match stores.contains_key(name) { + true => Err(make_err!( + Code::AlreadyExists, + "A store with the name '{}' already exists", + name + )), + _ => { + stores.insert(name.to_string(), store); + Ok(()) + } + } } pub fn get_store(&self, name: &str) -> Option { diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 000000000..bc68cdfea --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.82.0" \ No newline at end of file diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index c41a74072..6a274eb56 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -197,7 +197,8 @@ async fn inner_main( let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store)) .await .err_tip(|| format!("Failed to create store '{name}'"))?; - store_manager.add_store(&name, store); + + store_manager.add_store(&name, store).err_tip(|| format!("Failed to add store to manager '{name}'"))?; } } From 9f6701949be860c5798708d38a99ee9f2ff3280a Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sat, 28 Dec 2024 23:41:13 -0500 Subject: [PATCH 02/11] store flow improvements / anti-collision --- nativelink-config/src/stores.rs | 57 +++++ nativelink-service/BUILD.bazel | 1 + nativelink-service/tests/ac_server_test.rs | 32 ++- nativelink-service/tests/bep_server_test.rs | 17 +- .../tests/bytestream_server_test.rs | 17 +- nativelink-service/tests/cas_server_test.rs | 17 +- .../tests/store_overlap_rules_test.rs | 219 ++++++++++++++++++ nativelink-store/src/default_store_factory.rs | 34 ++- nativelink-store/src/store_manager.rs | 21 +- src/bin/nativelink.rs | 15 +- 10 files changed, 378 insertions(+), 52 deletions(-) create mode 100644 nativelink-service/tests/store_overlap_rules_test.rs diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..75e9c4b6a 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -432,6 +432,21 @@ pub enum StoreSpec { noop(NoopSpec), } +impl StoreSpec { + // To enforce no duplicate connection configs for a store, add it to the matcher and implement + // disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique. + pub fn disallow_duplicates_digest(&self) -> Option { + match self { + Self::memory(spec) => Some(spec.disallow_duplicates_digest()), + Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()), + Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()), + Self::grpc(spec) => Some(spec.disallow_duplicates_digest()), + Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()), + _ => None, + } + } +} + /// Configuration for an individual shard of the store. #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] @@ -514,6 +529,12 @@ pub struct FilesystemSpec { pub block_size: u64, } +impl FilesystemSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.content_path, self.temp_path) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct FastSlowSpec { @@ -535,6 +556,12 @@ pub struct MemorySpec { pub eviction_policy: Option, } +impl MemorySpec { + pub fn disallow_duplicates_digest(&self) -> String { + "InMemoryStore".into() + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct DedupSpec { @@ -787,6 +814,14 @@ pub struct S3Spec { pub disable_http2: bool, } +impl S3Spec { + pub fn disallow_duplicates_digest(&self) -> String { + let key_prefix = self.key_prefix.as_deref().unwrap_or_default(); + + format!("{}{}{}", self.region, self.bucket, key_prefix) + } +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum StoreType { @@ -852,6 +887,22 @@ pub struct GrpcSpec { pub connections_per_endpoint: usize, } +impl GrpcSpec { + // todo: could improve duplication detection to individual endpoints to disallow accidental re-use + fn disallow_duplicates_digest(&self) -> String { + format!( + "{}{}", + self.instance_name, + self.endpoints + .clone() + .into_iter() + .map(|endpoint| endpoint.address) + .collect::>() + .join(",") + ) + } +} + /// The possible error codes that might occur on an upstream request. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum ErrorCode { @@ -993,6 +1044,12 @@ pub struct RedisSpec { pub retry: Retry, } +impl RedisSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.addresses.clone().join(","), self.key_prefix) + } +} + #[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum RedisMode { diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 99374b8bd..d28811d09 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -53,6 +53,7 @@ rust_test_suite( "tests/bytestream_server_test.rs", "tests/cas_server_test.rs", "tests/worker_api_server_test.rs", + "tests/store_overlap_rules_test.rs", ], proc_macro_deps = [ "//nativelink-macro", diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index ac7bb9b80..1eed0947a 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, }; use nativelink_service::ac_server::AcServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::StoreLike; @@ -53,24 +53,22 @@ async fn insert_into_store( async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); - store_manager.add_store( + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + make_and_add_store_to_manager( "main_ac", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index fe120a549..befe01054 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{ PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId, }; use nativelink_service::bep_server::BepServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::channel_body_for_tests::ChannelBody; @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep"; /// Utility function to construct a [`StoreManager`] async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( BEP_STORE_NAME, - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 97154e804..991bef6b5 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{ QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse, }; use nativelink_service::bytestream_server::ByteStreamServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::channel_body_for_tests::ChannelBody; use nativelink_util::common::{encode_stream_proto, DigestInfo}; @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index a9b6ffe34..642f7054e 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ use nativelink_proto::google::rpc::Status as GrpcStatus; use nativelink_service::cas_server::CasServer; use nativelink_store::ac_utils::serialize_and_upload_message; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH"; async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs new file mode 100644 index 000000000..1eed0947a --- /dev/null +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -0,0 +1,219 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; + +use bytes::BytesMut; +use maplit::hashmap; +use nativelink_config::stores::{MemorySpec, StoreSpec}; +use nativelink_error::Error; +use nativelink_macro::nativelink_test; +use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; +use nativelink_proto::build::bazel::remote::execution::v2::{ + digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, +}; +use nativelink_service::ac_server::AcServer; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; +use nativelink_store::store_manager::StoreManager; +use nativelink_util::common::DigestInfo; +use nativelink_util::store_trait::StoreLike; +use pretty_assertions::assert_eq; +use prost::Message; +use tonic::{Code, Request, Response, Status}; + +const INSTANCE_NAME: &str = "foo_instance_name"; +const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; +const HASH1_SIZE: i64 = 147; + +async fn insert_into_store( + store: Pin<&impl StoreLike>, + hash: &str, + action_size: i64, + action_result: &T, +) -> Result> { + let mut store_data = BytesMut::new(); + action_result.encode(&mut store_data)?; + let data_len = store_data.len(); + let digest = DigestInfo::try_new(hash, action_size)?; + store.update_oneshot(digest, store_data.freeze()).await?; + Ok(data_len.try_into().unwrap()) +} + +async fn make_store_manager() -> Result, Error> { + let store_manager = Arc::new(StoreManager::new()); + make_and_add_store_to_manager( + "main_cas", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + make_and_add_store_to_manager( + "main_ac", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + Ok(store_manager) +} + +fn make_ac_server(store_manager: &StoreManager) -> Result { + AcServer::new( + &hashmap! { + "foo_instance_name".to_string() => nativelink_config::cas_server::AcStoreConfig{ + ac_store: "main_ac".to_string(), + read_only: false, + } + }, + store_manager, + ) +} + +async fn get_action_result( + ac_server: &AcServer, + hash: &str, + size: i64, +) -> Result, Status> { + ac_server + .get_action_result(Request::new(GetActionResultRequest { + instance_name: INSTANCE_NAME.to_string(), + action_digest: Some(Digest { + hash: hash.to_string(), + size_bytes: size, + }), + inline_stdout: false, + inline_stderr: false, + inline_output_files: vec![], + digest_function: digest_function::Value::Sha256.into(), + })) + .await +} + +#[nativelink_test] +async fn empty_store() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + + let raw_response = get_action_result(&ac_server, HASH1, 0).await; + + let err = raw_response.unwrap_err(); + assert_eq!(err.code(), Code::NotFound); + assert!(err.message().is_empty()); + Ok(()) +} + +#[nativelink_test] +async fn has_single_item() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; + let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE).await; + + assert!( + raw_response.is_ok(), + "Expected value, got error {raw_response:?}" + ); + assert_eq!(raw_response.unwrap().into_inner(), action_result); + Ok(()) +} + +#[nativelink_test] +async fn single_item_wrong_digest_size() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; + let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE - 1).await; + + let err = raw_response.unwrap_err(); + assert_eq!(err.code(), Code::NotFound); + assert!(err.message().is_empty()); + Ok(()) +} + +fn get_encoded_proto_size(proto: &T) -> Result> { + let mut store_data = Vec::new(); + proto.encode(&mut store_data)?; + Ok(store_data.len()) +} + +async fn update_action_result( + ac_server: &AcServer, + digest: Digest, + action_result: ActionResult, +) -> Result, Status> { + ac_server + .update_action_result(Request::new(UpdateActionResultRequest { + instance_name: INSTANCE_NAME.to_string(), + action_digest: Some(digest), + action_result: Some(action_result), + results_cache_policy: None, + digest_function: digest_function::Value::Sha256.into(), + })) + .await +} + +#[nativelink_test] +async fn one_item_update_test() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + let size_bytes = get_encoded_proto_size(&action_result)? as i64; + + let raw_response = update_action_result( + &ac_server, + Digest { + hash: HASH1.to_string(), + size_bytes, + }, + action_result.clone(), + ) + .await; + + assert!( + raw_response.is_ok(), + "Expected success, got error {raw_response:?}" + ); + assert_eq!(raw_response.unwrap().into_inner(), action_result); + + let digest = DigestInfo::try_new(HASH1, size_bytes)?; + let raw_data = ac_store.get_part_unchunked(digest, 0, None).await?; + + let decoded_action_result = ActionResult::decode(raw_data)?; + assert_eq!(decoded_action_result, action_result); + Ok(()) +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..0e97f1ed2 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -42,12 +42,44 @@ use crate::verify_store::VerifyStore; type FutureMaybeStore<'a> = Box> + 'a>; -pub fn store_factory<'a>( +pub async fn make_and_add_store_to_manager<'a>( + name: &'a str, + backend: &'a StoreSpec, + store_manager: &'a Arc, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, +) -> Result<(), Error> { + match store_factory(backend, store_manager, maybe_health_registry_builder).await { + Ok(store) => match store_manager.add_store(name, store) { + Ok(_) => { + if let Some(digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&digest)?; + store_manager.config_digest_add(digest); + } + + Ok(()) + } + + Err(e) => { + return Err(e); + } + }, + + Err(e) => { + return Err(e); + } + } +} + +fn store_factory<'a>( backend: &'a StoreSpec, store_manager: &'a Arc, maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { + if let Some(backend_config_digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&backend_config_digest)?; + } + let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index ec99b7bb1..c567d1fda 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -23,18 +23,21 @@ use parking_lot::RwLock; pub struct StoreManager { #[metric] stores: RwLock>, + store_config_anti_collision_digests: RwLock>, } impl StoreManager { pub fn new() -> StoreManager { StoreManager { stores: RwLock::new(HashMap::new()), + store_config_anti_collision_digests: RwLock::new(vec![]), } } pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { + let stores_rd = self.stores.read(); let mut stores = self.stores.write(); - match stores.contains_key(name) { + match stores_rd.contains_key(name) { true => Err(make_err!( Code::AlreadyExists, "A store with the name '{}' already exists", @@ -47,6 +50,22 @@ impl StoreManager { } } + pub fn digest_not_already_present(&self, digest: &str) -> Result<(), Error> { + let digests = self.store_config_anti_collision_digests.read(); + match digests.contains(&String::from(digest)) { + true => Err(make_err!( + Code::AlreadyExists, + "the provided config is already being used by another store" + )), + _ => Ok(()), + } + } + + pub fn config_digest_add(&self, digest: String) { + let mut digests = self.store_config_anti_collision_digests.write(); + digests.push(digest); + } + pub fn get_store(&self, name: &str) -> Option { let stores = self.stores.read(); if let Some(store) = stores.get(name) { diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 6a274eb56..5896b8d4e 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -45,7 +45,7 @@ use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; use nativelink_service::health_server::HealthServer; use nativelink_service::worker_api_server::WorkerApiServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::WorkerId; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; @@ -194,11 +194,14 @@ async fn inner_main( let health_component_name = format!("stores/{name}"); let mut health_register_store = health_registry_lock.sub_builder(&health_component_name); - let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?; - - store_manager.add_store(&name, store).err_tip(|| format!("Failed to add store to manager '{name}'"))?; + make_and_add_store_to_manager( + &name, + &store_cfg, + &store_manager, + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?; } } From ccca3a6446b9e027788cb028cc011915a1ac80bc Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 00:32:00 -0500 Subject: [PATCH 03/11] remove toolchain toml from changeset --- rust-toolchain.toml | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml deleted file mode 100644 index bc68cdfea..000000000 --- a/rust-toolchain.toml +++ /dev/null @@ -1,2 +0,0 @@ -[toolchain] -channel = "1.82.0" \ No newline at end of file From 6bb7cc2e2db6a40478695488728b7df98f962ff3 Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 12:57:41 -0500 Subject: [PATCH 04/11] fix deadlock and tests now failing for the right reason (duplicate data source) --- .../tests/store_overlap_rules_test.rs | 2 -- nativelink-store/src/default_store_factory.rs | 30 +++++-------------- nativelink-store/src/store_manager.rs | 13 ++++---- src/bin/nativelink.rs | 2 +- 4 files changed, 14 insertions(+), 33 deletions(-) diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs index 1eed0947a..968ddfdf1 100644 --- a/nativelink-service/tests/store_overlap_rules_test.rs +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -60,7 +60,6 @@ async fn make_store_manager() -> Result, Error> { None, ) .await?; - make_and_add_store_to_manager( "main_ac", &StoreSpec::memory(MemorySpec::default()), @@ -68,7 +67,6 @@ async fn make_store_manager() -> Result, Error> { None, ) .await?; - Ok(store_manager) } diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 0e97f1ed2..6ade408c1 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -48,26 +48,15 @@ pub async fn make_and_add_store_to_manager<'a>( store_manager: &'a Arc, maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Result<(), Error> { - match store_factory(backend, store_manager, maybe_health_registry_builder).await { - Ok(store) => match store_manager.add_store(name, store) { - Ok(_) => { - if let Some(digest) = backend.disallow_duplicates_digest() { - store_manager.digest_not_already_present(&digest)?; - store_manager.config_digest_add(digest); - } - - Ok(()) - } - - Err(e) => { - return Err(e); - } - }, - - Err(e) => { - return Err(e); - } + if let Some(digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&digest)?; + store_manager.config_digest_add(digest); } + + let store = store_factory(backend, store_manager, maybe_health_registry_builder).await?; + store_manager.add_store(name, store)?; + + Ok(()) } fn store_factory<'a>( @@ -76,9 +65,6 @@ fn store_factory<'a>( maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { - if let Some(backend_config_digest) = backend.disallow_duplicates_digest() { - store_manager.digest_not_already_present(&backend_config_digest)?; - } let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index c567d1fda..46ba2c0a3 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -35,19 +35,16 @@ impl StoreManager { } pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { - let stores_rd = self.stores.read(); let mut stores = self.stores.write(); - match stores_rd.contains_key(name) { - true => Err(make_err!( + if stores.contains_key(name) { + return Err(make_err!( Code::AlreadyExists, "A store with the name '{}' already exists", name - )), - _ => { - stores.insert(name.to_string(), store); - Ok(()) - } + )); } + stores.insert(name.to_string(), store); + Ok(()) } pub fn digest_not_already_present(&self, digest: &str) -> Result<(), Error> { diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 5896b8d4e..4dcd2f0eb 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -201,7 +201,7 @@ async fn inner_main( Some(&mut health_register_store), ) .await - .err_tip(|| format!("Failed to create store '{name}'"))?; + .err_tip(|| format!("Failed to create store '{name}'"))?; } } From 01f23a70549b7235a223d13f0d22dfe47c28d2f7 Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 14:11:12 -0500 Subject: [PATCH 05/11] =?UTF-8?q?tests=20looking=20=F0=9F=91=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nativelink-service/tests/ac_server_test.rs | 15 +- .../tests/store_overlap_rules_test.rs | 192 +----------------- 2 files changed, 23 insertions(+), 184 deletions(-) diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 1eed0947a..5531456eb 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::BytesMut; use maplit::hashmap; -use nativelink_config::stores::{MemorySpec, StoreSpec}; +use nativelink_config::stores::{EvictionPolicy, FilesystemSpec, MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; @@ -63,7 +63,18 @@ async fn make_store_manager() -> Result, Error> { make_and_add_store_to_manager( "main_ac", - &StoreSpec::memory(MemorySpec::default()), + &StoreSpec::filesystem(FilesystemSpec { + content_path: "/tmp/nativelink/testing/ac/content_path".into(), + temp_path: "/tmp/nativelink/testing/ac/tmp_path".into(), + read_buffer_size: 100, + block_size: 100, + eviction_policy: Some(EvictionPolicy { + max_bytes: 1_000_000_000, + evict_bytes: 10000, + max_seconds: 500, + max_count: 1_000_000, + }), + }), &store_manager, None, ) diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs index 968ddfdf1..8506ad02b 100644 --- a/nativelink-service/tests/store_overlap_rules_test.rs +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -12,206 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; use std::sync::Arc; -use bytes::BytesMut; -use maplit::hashmap; use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; -use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; -use nativelink_proto::build::bazel::remote::execution::v2::{ - digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, -}; -use nativelink_service::ac_server::AcServer; use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; -use nativelink_util::common::DigestInfo; -use nativelink_util::store_trait::StoreLike; -use pretty_assertions::assert_eq; -use prost::Message; -use tonic::{Code, Request, Response, Status}; -const INSTANCE_NAME: &str = "foo_instance_name"; -const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; -const HASH1_SIZE: i64 = 147; - -async fn insert_into_store( - store: Pin<&impl StoreLike>, - hash: &str, - action_size: i64, - action_result: &T, -) -> Result> { - let mut store_data = BytesMut::new(); - action_result.encode(&mut store_data)?; - let data_len = store_data.len(); - let digest = DigestInfo::try_new(hash, action_size)?; - store.update_oneshot(digest, store_data.freeze()).await?; - Ok(data_len.try_into().unwrap()) -} - -async fn make_store_manager() -> Result, Error> { +#[nativelink_test] +async fn same_datasource() -> Result<(), Error> { let store_manager = Arc::new(StoreManager::new()); - make_and_add_store_to_manager( + assert!(make_and_add_store_to_manager( "main_cas", &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) - .await?; - make_and_add_store_to_manager( - "main_ac", + .await + .is_ok()); + + assert!(make_and_add_store_to_manager( + "main_cas", &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) - .await?; - Ok(store_manager) -} - -fn make_ac_server(store_manager: &StoreManager) -> Result { - AcServer::new( - &hashmap! { - "foo_instance_name".to_string() => nativelink_config::cas_server::AcStoreConfig{ - ac_store: "main_ac".to_string(), - read_only: false, - } - }, - store_manager, - ) -} - -async fn get_action_result( - ac_server: &AcServer, - hash: &str, - size: i64, -) -> Result, Status> { - ac_server - .get_action_result(Request::new(GetActionResultRequest { - instance_name: INSTANCE_NAME.to_string(), - action_digest: Some(Digest { - hash: hash.to_string(), - size_bytes: size, - }), - inline_stdout: false, - inline_stderr: false, - inline_output_files: vec![], - digest_function: digest_function::Value::Sha256.into(), - })) - .await -} - -#[nativelink_test] -async fn empty_store() -> Result<(), Box> { - let store_manager = make_store_manager().await?; - let ac_server = make_ac_server(&store_manager)?; - - let raw_response = get_action_result(&ac_server, HASH1, 0).await; - - let err = raw_response.unwrap_err(); - assert_eq!(err.code(), Code::NotFound); - assert!(err.message().is_empty()); - Ok(()) -} - -#[nativelink_test] -async fn has_single_item() -> Result<(), Box> { - let store_manager = make_store_manager().await?; - let ac_server = make_ac_server(&store_manager)?; - let ac_store = store_manager.get_store("main_ac").unwrap(); - - let action_result = ActionResult { - exit_code: 45, - ..Default::default() - }; - - insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; - let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE).await; - - assert!( - raw_response.is_ok(), - "Expected value, got error {raw_response:?}" - ); - assert_eq!(raw_response.unwrap().into_inner(), action_result); - Ok(()) -} - -#[nativelink_test] -async fn single_item_wrong_digest_size() -> Result<(), Box> { - let store_manager = make_store_manager().await?; - let ac_server = make_ac_server(&store_manager)?; - let ac_store = store_manager.get_store("main_ac").unwrap(); - - let action_result = ActionResult { - exit_code: 45, - ..Default::default() - }; - - insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; - let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE - 1).await; - - let err = raw_response.unwrap_err(); - assert_eq!(err.code(), Code::NotFound); - assert!(err.message().is_empty()); - Ok(()) -} - -fn get_encoded_proto_size(proto: &T) -> Result> { - let mut store_data = Vec::new(); - proto.encode(&mut store_data)?; - Ok(store_data.len()) -} - -async fn update_action_result( - ac_server: &AcServer, - digest: Digest, - action_result: ActionResult, -) -> Result, Status> { - ac_server - .update_action_result(Request::new(UpdateActionResultRequest { - instance_name: INSTANCE_NAME.to_string(), - action_digest: Some(digest), - action_result: Some(action_result), - results_cache_policy: None, - digest_function: digest_function::Value::Sha256.into(), - })) - .await -} - -#[nativelink_test] -async fn one_item_update_test() -> Result<(), Box> { - let store_manager = make_store_manager().await?; - let ac_server = make_ac_server(&store_manager)?; - let ac_store = store_manager.get_store("main_ac").unwrap(); - - let action_result = ActionResult { - exit_code: 45, - ..Default::default() - }; - - let size_bytes = get_encoded_proto_size(&action_result)? as i64; - - let raw_response = update_action_result( - &ac_server, - Digest { - hash: HASH1.to_string(), - size_bytes, - }, - action_result.clone(), - ) - .await; - - assert!( - raw_response.is_ok(), - "Expected success, got error {raw_response:?}" - ); - assert_eq!(raw_response.unwrap().into_inner(), action_result); - - let digest = DigestInfo::try_new(HASH1, size_bytes)?; - let raw_data = ac_store.get_part_unchunked(digest, 0, None).await?; + .await + .is_err()); - let decoded_action_result = ActionResult::decode(raw_data)?; - assert_eq!(decoded_action_result, action_result); Ok(()) } From 4a4d11add8d02c9245e84c50284a8f437f368755 Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 14:19:52 -0500 Subject: [PATCH 06/11] adhere to formatting rules --- nativelink-store/src/default_store_factory.rs | 3 +-- src/bin/nativelink.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 6ade408c1..db1d89e91 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -55,7 +55,7 @@ pub async fn make_and_add_store_to_manager<'a>( let store = store_factory(backend, store_manager, maybe_health_registry_builder).await?; store_manager.add_store(name, store)?; - + Ok(()) } @@ -65,7 +65,6 @@ fn store_factory<'a>( maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { - let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 4dcd2f0eb..5896b8d4e 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -201,7 +201,7 @@ async fn inner_main( Some(&mut health_register_store), ) .await - .err_tip(|| format!("Failed to create store '{name}'"))?; + .err_tip(|| format!("Failed to create store '{name}'"))?; } } From d361abfaa58deeac2cfef3d3e49742e922a1a54f Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 14:25:14 -0500 Subject: [PATCH 07/11] formatting in the .bazel file --- nativelink-service/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index d28811d09..92209cf17 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -52,8 +52,8 @@ rust_test_suite( "tests/bep_server_test.rs", "tests/bytestream_server_test.rs", "tests/cas_server_test.rs", - "tests/worker_api_server_test.rs", "tests/store_overlap_rules_test.rs", + "tests/worker_api_server_test.rs", ], proc_macro_deps = [ "//nativelink-macro", From 91a355738febe50bbaaa13ec30900dda712b999d Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 15:11:54 -0500 Subject: [PATCH 08/11] some test fixes --- .../tests/store_overlap_rules_test.rs | 26 +++++++++++- nativelink-store/tests/redis_store_test.rs | 2 +- nativelink-store/tests/ref_store_test.rs | 40 +++++++++++-------- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs index 8506ad02b..c48b6bf73 100644 --- a/nativelink-service/tests/store_overlap_rules_test.rs +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -21,7 +21,7 @@ use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; #[nativelink_test] -async fn same_datasource() -> Result<(), Error> { +async fn same_datasource_disallowed_simple() -> Result<(), Error> { let store_manager = Arc::new(StoreManager::new()); assert!(make_and_add_store_to_manager( "main_cas", @@ -32,6 +32,21 @@ async fn same_datasource() -> Result<(), Error> { .await .is_ok()); + assert!(make_and_add_store_to_manager( + "main_ac", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await + .is_err()); + + Ok(()) +} + +#[nativelink_test] +async fn same_datasource_disallowed_complex() -> Result<(), Error> { + let store_manager = Arc::new(StoreManager::new()); assert!(make_and_add_store_to_manager( "main_cas", &StoreSpec::memory(MemorySpec::default()), @@ -39,6 +54,15 @@ async fn same_datasource() -> Result<(), Error> { None, ) .await + .is_ok()); + + assert!(make_and_add_store_to_manager( + "main_ac", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await .is_err()); Ok(()) diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 48f1cbdfa..2b2d71c85 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -883,7 +883,7 @@ async fn test_redis_fingerprint_metric() -> Result<(), Error> { )) }; - store_manager.add_store("redis_store", store); + store_manager.add_store("redis_store", store).unwrap(); }; let root_metrics = Arc::new(RwLock::new(RootMetricsTest { diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index 18b1463a0..a26752a0d 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -15,9 +15,10 @@ use std::ptr::from_ref; use std::sync::Arc; -use nativelink_config::stores::{MemorySpec, RefSpec}; +use nativelink_config::stores::{MemorySpec, RefSpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::memory_store::MemoryStore; use nativelink_store::ref_store::RefStore; use nativelink_store::store_manager::StoreManager; @@ -27,19 +28,24 @@ use pretty_assertions::assert_eq; const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; -fn setup_stores() -> (Arc, Store, Store) { +async fn setup_stores() -> (Arc, Store, Store) { let store_manager = Arc::new(StoreManager::new()); - let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); - store_manager.add_store("foo", memory_store.clone()); + let memory_store_spec = StoreSpec::memory(MemorySpec::default()); + + make_and_add_store_to_manager("foo", &memory_store_spec, &store_manager, None) + .await + .unwrap(); + + let ref_store_spec = StoreSpec::ref_store(RefSpec { name: "foo".into() }); + + make_and_add_store_to_manager("bar", &ref_store_spec, &store_manager, None) + .await + .unwrap(); + + let memory_store = store_manager.get_store("foo").unwrap(); + let ref_store = store_manager.get_store("bar").unwrap(); - let ref_store = Store::new(RefStore::new( - &RefSpec { - name: "foo".to_string(), - }, - Arc::downgrade(&store_manager), - )); - store_manager.add_store("bar", ref_store.clone()); (store_manager, memory_store, ref_store) } @@ -47,7 +53,7 @@ fn setup_stores() -> (Arc, Store, Store) { async fn has_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into memory store. @@ -77,7 +83,7 @@ async fn has_test() -> Result<(), Error> { async fn get_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into memory store. @@ -108,7 +114,7 @@ async fn get_test() -> Result<(), Error> { async fn update_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into ref_store. @@ -140,7 +146,7 @@ async fn inner_store_test() -> Result<(), Error> { let store_manager = Arc::new(StoreManager::new()); let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); - store_manager.add_store("mem_store", memory_store.clone()); + store_manager.add_store("mem_store", memory_store.clone())?; let ref_store_inner = Store::new(RefStore::new( &RefSpec { @@ -148,7 +154,7 @@ async fn inner_store_test() -> Result<(), Error> { }, Arc::downgrade(&store_manager), )); - store_manager.add_store("ref_store_inner", ref_store_inner.clone()); + store_manager.add_store("ref_store_inner", ref_store_inner.clone())?; let ref_store_outer = Store::new(RefStore::new( &RefSpec { @@ -156,7 +162,7 @@ async fn inner_store_test() -> Result<(), Error> { }, Arc::downgrade(&store_manager), )); - store_manager.add_store("ref_store_outer", ref_store_outer.clone()); + store_manager.add_store("ref_store_outer", ref_store_outer.clone())?; // Ensure the result of inner_store() points to exact same memory store. assert_eq!( From 9adeef34100eb1826b3d969feda0a9dc0e67db44 Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 15:25:20 -0500 Subject: [PATCH 09/11] use working dir for filesystem store in test --- nativelink-service/tests/ac_server_test.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 5531456eb..a6800f1ea 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::env; use std::pin::Pin; use std::sync::Arc; @@ -61,11 +62,21 @@ async fn make_store_manager() -> Result, Error> { ) .await?; + let current_dir = env::current_dir().expect("Failed to get current directory"); + make_and_add_store_to_manager( "main_ac", &StoreSpec::filesystem(FilesystemSpec { - content_path: "/tmp/nativelink/testing/ac/content_path".into(), - temp_path: "/tmp/nativelink/testing/ac/tmp_path".into(), + content_path: current_dir + .join("testing_data/ac/content_path") + .into_os_string() + .into_string() + .unwrap(), + temp_path: current_dir + .join("testing_data/ac/tmp_path") + .into_os_string() + .into_string() + .unwrap(), read_buffer_size: 100, block_size: 100, eviction_policy: Some(EvictionPolicy { From 0022bdfbda24dbacfa09e3194b99e757011e04b2 Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Sun, 29 Dec 2024 16:14:13 -0500 Subject: [PATCH 10/11] use defaults for Filesystem store --- nativelink-service/tests/ac_server_test.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index a6800f1ea..365a68ca6 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use bytes::BytesMut; use maplit::hashmap; -use nativelink_config::stores::{EvictionPolicy, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{FilesystemSpec, MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; @@ -49,6 +49,7 @@ async fn insert_into_store( let data_len = store_data.len(); let digest = DigestInfo::try_new(hash, action_size)?; store.update_oneshot(digest, store_data.freeze()).await?; + Ok(data_len.try_into().unwrap()) } @@ -64,6 +65,8 @@ async fn make_store_manager() -> Result, Error> { let current_dir = env::current_dir().expect("Failed to get current directory"); + let default_filesystem_spec = FilesystemSpec::default(); + make_and_add_store_to_manager( "main_ac", &StoreSpec::filesystem(FilesystemSpec { @@ -77,14 +80,9 @@ async fn make_store_manager() -> Result, Error> { .into_os_string() .into_string() .unwrap(), - read_buffer_size: 100, - block_size: 100, - eviction_policy: Some(EvictionPolicy { - max_bytes: 1_000_000_000, - evict_bytes: 10000, - max_seconds: 500, - max_count: 1_000_000, - }), + read_buffer_size: default_filesystem_spec.read_buffer_size, + eviction_policy: default_filesystem_spec.eviction_policy, + block_size: default_filesystem_spec.block_size, }), &store_manager, None, From aff36b83090fa440aaa3ce90c6b5baa688f1162c Mon Sep 17 00:00:00 2001 From: Mike Keen Date: Mon, 30 Dec 2024 16:19:50 -0500 Subject: [PATCH 11/11] still not final -- will remove ptr check -- not needed thanks to rust's model anyway --- nativelink-config/src/stores.rs | 7 ------ .../tests/.#store_overlap_rules_test.rs | 1 + nativelink-service/tests/ac_server_test.rs | 23 ++--------------- .../tests/store_overlap_rules_test.rs | 25 ++----------------- nativelink-store/src/store_manager.rs | 17 +++++++++++-- src/bin/nativelink.rs | 10 ++++++++ 6 files changed, 30 insertions(+), 53 deletions(-) create mode 120000 nativelink-service/tests/.#store_overlap_rules_test.rs diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 75e9c4b6a..c6293a387 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -437,7 +437,6 @@ impl StoreSpec { // disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique. pub fn disallow_duplicates_digest(&self) -> Option { match self { - Self::memory(spec) => Some(spec.disallow_duplicates_digest()), Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()), Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()), Self::grpc(spec) => Some(spec.disallow_duplicates_digest()), @@ -556,12 +555,6 @@ pub struct MemorySpec { pub eviction_policy: Option, } -impl MemorySpec { - pub fn disallow_duplicates_digest(&self) -> String { - "InMemoryStore".into() - } -} - #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct DedupSpec { diff --git a/nativelink-service/tests/.#store_overlap_rules_test.rs b/nativelink-service/tests/.#store_overlap_rules_test.rs new file mode 120000 index 000000000..7e7432617 --- /dev/null +++ b/nativelink-service/tests/.#store_overlap_rules_test.rs @@ -0,0 +1 @@ +mkeen@lab.245673:1735391239 \ No newline at end of file diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 365a68ca6..918a10afb 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env; use std::pin::Pin; use std::sync::Arc; use bytes::BytesMut; use maplit::hashmap; -use nativelink_config::stores::{FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; @@ -63,27 +62,9 @@ async fn make_store_manager() -> Result, Error> { ) .await?; - let current_dir = env::current_dir().expect("Failed to get current directory"); - - let default_filesystem_spec = FilesystemSpec::default(); - make_and_add_store_to_manager( "main_ac", - &StoreSpec::filesystem(FilesystemSpec { - content_path: current_dir - .join("testing_data/ac/content_path") - .into_os_string() - .into_string() - .unwrap(), - temp_path: current_dir - .join("testing_data/ac/tmp_path") - .into_os_string() - .into_string() - .unwrap(), - read_buffer_size: default_filesystem_spec.read_buffer_size, - eviction_policy: default_filesystem_spec.eviction_policy, - block_size: default_filesystem_spec.block_size, - }), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs index c48b6bf73..385fea763 100644 --- a/nativelink-service/tests/store_overlap_rules_test.rs +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -39,31 +39,10 @@ async fn same_datasource_disallowed_simple() -> Result<(), Error> { None, ) .await - .is_err()); - - Ok(()) -} - -#[nativelink_test] -async fn same_datasource_disallowed_complex() -> Result<(), Error> { - let store_manager = Arc::new(StoreManager::new()); - assert!(make_and_add_store_to_manager( - "main_cas", - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await .is_ok()); - assert!(make_and_add_store_to_manager( - "main_ac", - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await - .is_err()); + let existing_cas = store_manager.get_store("main_cas").unwrap(); + store_manager.add_store("different_store", existing_cas)?; Ok(()) } diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index 46ba2c0a3..2ef5c5f96 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::ptr; use nativelink_error::{make_err, Code, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; @@ -36,14 +37,26 @@ impl StoreManager { pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { let mut stores = self.stores.write(); + if stores.contains_key(name) { return Err(make_err!( Code::AlreadyExists, - "A store with the name '{}' already exists", + "a store with the name '{}' already exists", name )); } - stores.insert(name.to_string(), store); + + for existing_store in stores.values().into_iter() { + if ptr::eq(&store, existing_store) { + return Err(make_err!( + Code::AlreadyExists, + "an instance of this store is already managed" + )); + } + } + + stores.insert(name.into(), store); + Ok(()) } diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 5896b8d4e..81f657741 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -12,11 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. + + + use std::collections::{HashMap, HashSet}; + + + use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + + + use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser;