Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
add s3 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
J-HowHuang committed Apr 2, 2024
1 parent 628802f commit e7fe9d0
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 43 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/server-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
- 'main'
- 'cp2-server*'
- 'rust-integration-test'
- 'server-*'
pull_request:
branches:
- 'main'
Expand All @@ -21,12 +22,16 @@ jobs:
continue-on-error: false
env:
SERVER_ROOT: ${{ github.workspace }}/server
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v3
- name: Install Toolchain
run: rustup update stable && rustup default stable && rustup component add rustfmt
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Format check
run: cargo fmt --all -- --check
- name: setup-redis
uses: shogo82148/actions-setup-redis@v1
with:
Expand Down
17 changes: 0 additions & 17 deletions server/debug.sh

This file was deleted.

1 change: 1 addition & 0 deletions server/redis.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cluster-enabled yes
cluster-node-timeout 5000
appendonly no
save ""
File renamed without changes.
41 changes: 27 additions & 14 deletions server/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ pub struct DiskCache {
access_order: VecDeque<String>,
}

#[derive(rocket::Responder)]
pub enum GetFileResult {
#[response(status = 200)]
Hit(NamedFile),
#[response(status = 303)]
Redirect(Box<Redirect>), // Box this Redirect to avoid [warn] clippy::large_enum_variant
#[response(status = 404)]
NotFoundOnS3(String),
#[response(status = 500)]
InitFailed(String),
}

// DiskCache Implementation ---------------------------------------------------

impl DiskCache {
Expand All @@ -51,7 +63,7 @@ impl DiskCache {
uid: PathBuf,
connector: Arc<dyn StorageConnector + Send + Sync>,
redis_read: &RwLockReadGuard<'_, RedisServer>,
) -> Result<NamedFile, Redirect> {
) -> GetFileResult {
let uid_str = uid.into_os_string().into_string().unwrap();
let mut cache = cache.lock().await;
let redirect = redis_read.location_lookup(uid_str.clone()).await;
Expand All @@ -66,16 +78,13 @@ impl DiskCache {
url.set_port(Some(p + PORT_OFFSET_TO_WEB_SERVER)).unwrap();
url.set_path(&format!("s3/{}", &uid_str)[..]);
debug!("tell client to redirect to {}", url.to_string());
return Err(Redirect::to(url.to_string()));
return GetFileResult::Redirect(Box::new(Redirect::to(url.to_string())));
}
let file_name = if let Some(redis_res) = redis_read.get_file(uid_str.clone()).await {
debug!("{} found in cache", &uid_str);
redis_res
} else {
match cache
.get_s3_file_to_cache(&uid_str, connector)
.await
{
match cache.get_s3_file_to_cache(&uid_str, connector).await {
Ok(local_file_name) => {
debug!("{} fetched from S3", &uid_str);
cache.ensure_capacity(redis_read).await;
Expand All @@ -89,17 +98,18 @@ impl DiskCache {
}
Err(e) => {
info!("{}", e.to_string());
return Err(Redirect::to("/not_found_on_S3"));
return GetFileResult::NotFoundOnS3(uid_str);
}
}
};
let file_name_str = file_name.to_str().unwrap_or_default().to_string();
debug!("get_file: {}", file_name_str);
cache.update_access(&file_name_str);
let cache_file_path = cache.cache_dir.join(file_name);
return NamedFile::open(cache_file_path)
.await
.map_err(|_| Redirect::to("/not_found_on_this_disk"));
match NamedFile::open(cache_file_path).await {
Ok(x) => GetFileResult::Hit(x),
Err(_) => GetFileResult::NotFoundOnS3(uid_str),
}
}

async fn get_s3_file_to_cache(
Expand Down Expand Up @@ -183,7 +193,7 @@ impl ConcurrentDiskCache {
&self,
uid: PathBuf,
connector: Arc<dyn StorageConnector + Send + Sync>,
) -> Result<NamedFile, Redirect> {
) -> GetFileResult {
let uid = uid.into_os_string().into_string().unwrap();
// Use read lock for read operations
let redis_read = self.redis.read().await; // Acquiring a read lock
Expand All @@ -192,8 +202,10 @@ impl ConcurrentDiskCache {

let mut redis_write = self.redis.write().await; // Acquiring a write lock
if let Err(e) = redis_write.update_slot_to_node_mapping().await {
eprintln!("Error updating slot-to-node mapping: {:?}", e);
return Err(Redirect::to("/error_updating_mapping"));
return GetFileResult::InitFailed(format!(
"Error updating slot-to-node mapping: {:?}",
e
));
}
redis_write.get_myid(self.redis_port);
redis_write.mapping_initialized = true;
Expand All @@ -207,7 +219,8 @@ impl ConcurrentDiskCache {
let shard = &self.shards[shard_index];
// Debug message showing shard selection
debug!("Selected shard index: {} for uid: {}", shard_index, &uid);
let result = DiskCache::get_file(shard.clone(), uid.into(), connector.clone(), &redis_read).await;
let result =
DiskCache::get_file(shard.clone(), uid.into(), connector.clone(), &redis_read).await;
drop(redis_read);
debug!("{}", self.get_stats().await);
result
Expand Down
6 changes: 3 additions & 3 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate log;
use crate::storage::mock_storage_connector::MockS3StorageConnector;
use crate::storage::s3_storage_connector::S3StorageConnector;
use crate::storage::storage_connector::StorageConnector;
use rocket::{fs::NamedFile, response::Redirect, State};
use rocket::State;
use rocket::{get, post, routes, Rocket};
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -25,7 +25,7 @@ async fn get_file(
uid: PathBuf,
cache: &State<Arc<ConcurrentDiskCache>>,
s3_connector: &State<Arc<dyn StorageConnector + Send + Sync>>,
) -> Result<NamedFile, Redirect> {
) -> cache::GetFileResult {
cache
.inner()
.clone()
Expand Down Expand Up @@ -76,7 +76,7 @@ impl ServerNode {

let cache_manager = Arc::new(ConcurrentDiskCache::new(
PathBuf::from(&config.cache_dir),
6, // [TODO] make this configurable
6, // [TODO] make this configurable
vec![format!("redis://0.0.0.0:{}", config.redis_port)],
config.redis_port,
));
Expand Down
1 change: 0 additions & 1 deletion server/src/storage/s3_storage_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;


use super::storage_connector::StorageConnector;

pub struct S3StorageConnector {
Expand Down
24 changes: 20 additions & 4 deletions server/tests/test_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod utils;

#[test]
fn test_healthy() {
let (_, [client_1, client_2, client_3]) = utils::launch_server_node_size_3();
let (_, [client_1, client_2, client_3]) = utils::launch_server_node_size_3(true);

let response = client_1.get("/").dispatch();
assert_eq!(response.status(), Status::Ok);
Expand All @@ -21,7 +21,7 @@ fn test_healthy() {

#[test]
fn test_clear() {
let (_, [client_1, _, _]) = utils::launch_server_node_size_3();
let (_, [client_1, _, _]) = utils::launch_server_node_size_3(true);

let _ = client_1.get("/s3/test2.txt").dispatch();
let response = client_1.get("/stats").dispatch();
Expand All @@ -37,7 +37,7 @@ fn test_clear() {

#[test]
fn test_get_file() {
let (_, [client_1, client_2, client_3]) = utils::launch_server_node_size_3();
let (_, [client_1, client_2, client_3]) = utils::launch_server_node_size_3(true);
let response = client_1.get("/s3/test1.txt").dispatch();
assert_eq!(response.status(), Status::SeeOther);
let response = client_1.get("/s3/test2.txt").dispatch();
Expand All @@ -61,7 +61,7 @@ fn test_get_file() {

#[test]
fn test_evict() {
let (_, [client_1, _, _]) = utils::launch_server_node_size_3();
let (_, [client_1, _, _]) = utils::launch_server_node_size_3(true);

let response = client_1.post("/clear").dispatch();
assert_eq!(response.status(), Status::Ok);
Expand All @@ -76,3 +76,19 @@ fn test_evict() {
assert!(!stats.contains("test6"));
assert!(stats.contains("test8"));
}

#[test]
fn test_s3_connector() {
let (_, [client_1, _, _]) = utils::launch_server_node_size_3(false);
let response = client_1.get("/s3/test2.txt").dispatch();
assert_eq!(response.status(), Status::Ok);
let response = client_1.post("/clear").dispatch();
assert_eq!(response.status(), Status::Ok);
}

#[test]
fn test_s3_connector_file_not_exist() {
let (_, [_, client_2, _]) = utils::launch_server_node_size_3(false);
let response = client_2.get("/s3/jhow.sucks").dispatch();
assert_eq!(response.status(), Status::NotFound);
}
38 changes: 34 additions & 4 deletions server/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use istziio_server_node::server::{ServerConfig, ServerNode};
use rocket::local::blocking::Client;
use std::env;

pub fn get_server_config_mocks3(redis_port: u16) -> ServerConfig {
ServerConfig {
Expand All @@ -13,10 +14,39 @@ pub fn get_server_config_mocks3(redis_port: u16) -> ServerConfig {
}
}

pub fn launch_server_node_size_3() -> ([ServerNode; 3], [Client; 3]) {
let config_1 = get_server_config_mocks3(6379);
let config_2 = get_server_config_mocks3(6380);
let config_3 = get_server_config_mocks3(6381);
pub fn get_server_config_s3(
redis_port: u16,
aws_access_key: String,
aws_secret_key: String,
) -> ServerConfig {
ServerConfig {
redis_port,
cache_dir: format!("./cache_{}", redis_port),
use_mock_s3_endpoint: None,
bucket: Some("istziio-bucket".into()),
region_name: Some("us-east-1".into()),
access_key: Some(aws_access_key),
secret_key: Some(aws_secret_key),
}
}

pub fn launch_server_node_size_3(mocks3: bool) -> ([ServerNode; 3], [Client; 3]) {
let (config_1, config_2, config_3) = if mocks3 {
(
get_server_config_mocks3(6379),
get_server_config_mocks3(6380),
get_server_config_mocks3(6381),
)
} else {
let access_key = env::var("AWS_ACCESS_KEY_ID").expect("$AWS_ACCESS_KEY_ID not set!");
let secret_key =
env::var("AWS_SECRET_ACCESS_KEY").expect("$AWS_SECRET_ACCESS_KEY not set!");
(
get_server_config_s3(6379, access_key.clone(), secret_key.clone()),
get_server_config_s3(6380, access_key.clone(), secret_key.clone()),
get_server_config_s3(6381, access_key.clone(), secret_key.clone()),
)
};

let node_1 = ServerNode::new(config_1);
let node_2 = ServerNode::new(config_2);
Expand Down

0 comments on commit e7fe9d0

Please sign in to comment.