Skip to content

Commit

Permalink
http shard client (#7)
Browse files Browse the repository at this point in the history
* import cas_types definitions

* http based shard client

* use cas_types::Key

* add test

* fix linting
  • Loading branch information
seanses authored Sep 14, 2024
1 parent 2f7116a commit d553005
Show file tree
Hide file tree
Showing 17 changed files with 388 additions and 710 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ debug/
# VS Code configs
.vscode
venv
**/*.env
76 changes: 29 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ resolver = "2"
members = [
"cache",
"cas_client",
"cas_object",
"cas_types",
"data",
"error_printer",
"file_utils",
Expand All @@ -17,8 +19,6 @@ members = [
"shard_client",
"utils",
"xet_error",
"cas_object",
"cas_types",
]

exclude = [
Expand Down
23 changes: 2 additions & 21 deletions data/src/remote_shard_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ impl RemoteShardInterface {
}

let salt = self.repo_salt()?;
let cas = self.cas()?;
let cas_ref = &cas;
let shard_client = self.shard_client()?;
let shard_client_ref = &shard_client;
let shard_prefix = self.shard_prefix.clone();
Expand All @@ -352,25 +350,10 @@ impl RemoteShardInterface {
&si.shard_hash
);
let data = std::fs::read(&si.path)?;
let data_len = data.len();
// Upload the shard.
cas_ref
.put_bypass_stage(
shard_prefix_ref,
&si.shard_hash,
data,
vec![data_len as u64],
)
.await?;

debug!(
"Registering shard {shard_prefix_ref}/{:?} with shard server.",
&si.shard_hash
);

// That succeeded if we made it here, so now try to sync things.
// Upload the shard.
shard_client_ref
.register_shard_with_salt(shard_prefix_ref, &si.shard_hash, false, &salt)
.upload_shard(&self.shard_prefix, &si.shard_hash, false, &data, &salt)
.await?;

info!(
Expand All @@ -388,8 +371,6 @@ impl RemoteShardInterface {
parutils::ParallelError::TaskError(e) => e,
})?;

cas.flush().await?;

Ok(())
}

Expand Down
12 changes: 2 additions & 10 deletions data/src/shard_interface.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::configurations::{Endpoint::*, StorageConfig};
use super::errors::Result;
use crate::constants::XET_VERSION;
use mdb_shard::ShardFileManager;
use shard_client::{GrpcShardClient, LocalShardClient, ShardClientInterface};
use shard_client::{HttpShardClient, LocalShardClient, ShardClientInterface};
use std::sync::Arc;
use tracing::{info, warn};

Expand Down Expand Up @@ -40,14 +39,7 @@ pub async fn create_shard_client(
) -> Result<Arc<dyn ShardClientInterface>> {
info!("Shard endpoint = {:?}", shard_storage_config.endpoint);
let client: Arc<dyn ShardClientInterface> = match &shard_storage_config.endpoint {
Server(endpoint) => {
let shard_connection_config = shard_client::ShardConnectionConfig {
endpoint: endpoint.clone(),
user_id: shard_storage_config.auth.user_id.clone(),
git_xet_version: XET_VERSION.to_string(),
};
Arc::new(GrpcShardClient::from_config(shard_connection_config).await?)
}
Server(endpoint) => Arc::new(HttpShardClient::new(endpoint)),
FileSystem(path) => Arc::new(LocalShardClient::new(path).await?),
};

Expand Down
Loading

0 comments on commit d553005

Please sign in to comment.