Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
enhance test for async api
Browse files Browse the repository at this point in the history
  • Loading branch information
ScottLinnn committed Apr 30, 2024
1 parent 6fe74d2 commit 225c87a
Showing 1 changed file with 59 additions and 11 deletions.
70 changes: 59 additions & 11 deletions client/src/storage_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl StorageClientImpl {

file.write_all(&file_contents)?;
println!("parquet written to {}", file_path);
// STREAM VERSION CODE!
// STREAM VERSION CODE - NOT IN USE!

// let mut file_path = self.local_cache.clone();

Expand Down Expand Up @@ -279,14 +279,21 @@ mod tests {
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
let names = StringArray::from(vec!["Alice", "Bob", "Carol", "Dave", "Eve"]);
let row_num = 10;
let ids_vec = (1..=row_num as i32).collect::<Vec<i32>>();

// names is a vector of row_num names "testrow_{id}", each element is a &str
let names_vec = (1..=row_num)
.map(|id| format!("testrow_{}", id))
.collect::<Vec<String>>();
let ids = Int32Array::from(ids_vec);
let names = StringArray::from(names_vec);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(ids), Arc::new(names)]).unwrap();
batch
}

fn create_sample_parquet_file(file_name: &str) -> anyhow::Result<()> {
fn create_sample_parquet_file(file_name: &str, row_num: usize) -> anyhow::Result<()> {
let mut cache_path = StorageClientImpl::local_cache_path();
if !Path::new(&cache_path).exists() {
fs::create_dir_all(&cache_path).unwrap();
Expand All @@ -306,8 +313,15 @@ mod tests {
let props: WriterProperties = WriterProperties::builder().build();
let mut writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;

let ids = vec![1, 2, 3, 4, 5];
let names = vec!["Alice", "Bob", "Carol", "Dave", "Eve"];
// ids is a vector of row_num elements, each element is an i32
let ids = (1..=row_num as i32).collect::<Vec<i32>>();

// names is a vector of row_num names "testrow_{id}", each element is a &str
let names = (1..=row_num)
.map(|id| format!("testrow_{}", id))
.collect::<Vec<String>>();
let names_str: Vec<&str> = names.iter().map(|name| name.as_str()).collect();

let mut row_group_writer = writer.next_row_group().unwrap();
while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
// ... write values to a column writer
Expand All @@ -316,8 +330,10 @@ mod tests {
typed_writer.write_batch(&ids, None, None)?;
}
ColumnWriter::ByteArrayColumnWriter(ref mut typed_writer) => {
let byte_array_names: Vec<ByteArray> =
names.iter().map(|&name| ByteArray::from(name)).collect();
let byte_array_names: Vec<ByteArray> = names_str
.iter()
.map(|&name| ByteArray::from(name))
.collect();
typed_writer.write_batch(&byte_array_names, None, None)?;
}
_ => {}
Expand All @@ -337,11 +353,23 @@ mod tests {

file_path.push_str(file_name);
table_file_map.insert(0, file_name.to_string());

// Create a sample parquet file
create_sample_parquet_file(file_name, 10).unwrap();
(
StorageClientImpl::new_for_test(1, table_file_map, "http://localhost:26380", true),
file_name.to_string(),
)
}

create_sample_parquet_file(file_name).unwrap();
fn setup_local_large() -> (StorageClientImpl, String) {
let mut table_file_map: HashMap<TableId, String> = HashMap::new();
let file_name: &str = "sample.parquet";
let mut file_path = StorageClientImpl::local_cache_path();

file_path.push_str(file_name);
table_file_map.insert(0, file_name.to_string());
// Create a sample parquet file
create_sample_parquet_file(file_name, 1000_000).unwrap();
(
StorageClientImpl::new_for_test(1, table_file_map, "http://localhost:26380", true),
file_name.to_string(),
Expand Down Expand Up @@ -416,7 +444,7 @@ mod tests {
}

#[test]
fn test_request_data_table_local() {
fn test_request_data_table_local_simple() {
let (client, _file_name) = setup_local();
let rt: Runtime = Runtime::new().unwrap();
rt.block_on(async {
Expand All @@ -434,6 +462,26 @@ mod tests {
});
}

#[test]
fn test_request_data_table_local_exhaustive() {
let (client, _file_name) = setup_local_large();
let rt: Runtime = Runtime::new().unwrap();
rt.block_on(async {
let mut receiver = client.request_data(StorageRequest::Table(0)).await.unwrap();
// Wait for the channel to be ready
sleep(Duration::from_secs(1)).await;
// Assert that the channel is ready
let mut total_num_rows = 0;
while let Some(rb) = receiver.recv().await {
total_num_rows += rb.num_rows();
}

assert_eq!(total_num_rows, 1000_000);
// println!("RecordBatch: {:?}", record_batch);
// println!("SampleRecordBatch: {:?}", sample_rb);
});
}

#[test]
#[ignore]
fn test_request_data_table_remote() {
Expand Down

0 comments on commit 225c87a

Please sign in to comment.