Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-768 Store config uniqueness enforced by StoreManager #1555

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
57 changes: 57 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,21 @@ pub enum StoreSpec {
noop(NoopSpec),
}

impl StoreSpec {
// To enforce no duplicate connection configs for a store, add it to the matcher and implement
// disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique.
pub fn disallow_duplicates_digest(&self) -> Option<String> {
match self {
Self::memory(spec) => Some(spec.disallow_duplicates_digest()),
Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()),
Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()),
Self::grpc(spec) => Some(spec.disallow_duplicates_digest()),
Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()),
_ => None,
}
}
}

/// Configuration for an individual shard of the store.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -514,6 +529,12 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

impl FilesystemSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.content_path, self.temp_path)
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
Expand All @@ -535,6 +556,12 @@ pub struct MemorySpec {
pub eviction_policy: Option<EvictionPolicy>,
}

impl MemorySpec {
pub fn disallow_duplicates_digest(&self) -> String {
"InMemoryStore".into()
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct DedupSpec {
Expand Down Expand Up @@ -787,6 +814,14 @@ pub struct S3Spec {
pub disable_http2: bool,
}

impl S3Spec {
pub fn disallow_duplicates_digest(&self) -> String {
let key_prefix = self.key_prefix.as_deref().unwrap_or_default();

format!("{}{}{}", self.region, self.bucket, key_prefix)
}
}

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum StoreType {
Expand Down Expand Up @@ -852,6 +887,22 @@ pub struct GrpcSpec {
pub connections_per_endpoint: usize,
}

impl GrpcSpec {
// todo: could improve duplication detection to individual endpoints to disallow accidental re-use
fn disallow_duplicates_digest(&self) -> String {
format!(
"{}{}",
self.instance_name,
self.endpoints
.clone()
.into_iter()
.map(|endpoint| endpoint.address)
.collect::<Vec<String>>()
.join(",")
)
}
}

/// The possible error codes that might occur on an upstream request.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ErrorCode {
Expand Down Expand Up @@ -993,6 +1044,12 @@ pub struct RedisSpec {
pub retry: Retry,
}

impl RedisSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.addresses.clone().join(","), self.key_prefix)
}
}

#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RedisMode {
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rust_test_suite(
"tests/bytestream_server_test.rs",
"tests/cas_server_test.rs",
"tests/worker_api_server_test.rs",
"tests/store_overlap_rules_test.rs",
],
proc_macro_deps = [
"//nativelink-macro",
Expand Down
32 changes: 15 additions & 17 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest,
};
use nativelink_service::ac_server::AcServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
Expand Down Expand Up @@ -53,24 +53,22 @@ async fn insert_into_store<T: Message>(

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what is the reason why you moved away from store_factory rather than modifying store_factory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using store_factory to build the Store, but wrapping up in make_and_add_store_to_manager because it makes building and adding a Store to a StoreManager a single step from a public-facing perspective.

"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
store_manager.add_store(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

make_and_add_store_to_manager(
"main_ac",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bep_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{
PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId,
};
use nativelink_service::bep_server::BepServer;
use nativelink_store::default_store_factory::store_factory;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems heavy at first read but maybe it is an improvement to both the naming and the functionality. store_factory as a name says nothing about what its function is, but developers could go look at the definition. make_and_add_store_to_manager is definitely clearer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store_factory is still used but I made private and think that make_and_add_store_to_manager is better as a front-facing API because it makes and associates the Store with a StoreManager -- rather than leaving them as separate steps. The door is still open to manually build any Store instance though and call add_store on StoreManager.

use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::channel_body_for_tests::ChannelBody;
Expand All @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep";
/// Utility function to construct a [`StoreManager`]
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
BEP_STORE_NAME,
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{
QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse,
};
use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::channel_body_for_tests::ChannelBody;
use nativelink_util::common::{encode_stream_proto, DigestInfo};
Expand All @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_service::cas_server::CasServer;
use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
Expand All @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH";

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
Loading
Loading