Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #9 from xx01cyx/yx/request-id
Browse files Browse the repository at this point in the history
feat: add request id for storage request type
  • Loading branch information
J-HowHuang authored May 1, 2024
2 parents e23e3a6 + 71daa79 commit 9a317a1
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
25 changes: 14 additions & 11 deletions client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use istziio_client::client_api::{StorageClient, StorageRequest, TableId};
use istziio_client::client_api::{DataRequest, StorageClient, StorageRequest, TableId};
use istziio_client::storage_client::StorageClientImpl;
use std::collections::HashMap;
use std::path::Path;
Expand Down Expand Up @@ -65,8 +65,8 @@ async fn parallel_load_run(clients: Vec<Box<dyn StorageClient>>, requests: Vec<S
let client_start = Instant::now();
let req = &requests[client_id];
// for req in &requests {
let table_id = match req {
StorageRequest::Table(id) => id,
let table_id = match req.data_request() {
DataRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!(
Expand Down Expand Up @@ -101,8 +101,8 @@ async fn load_run(client: &dyn StorageClient, requests: Vec<StorageRequest>) {
println!("------------Start running workload [SEQUENTIALLY]!------------");
let start = Instant::now();
for req in requests {
let id = match req {
StorageRequest::Table(id) => id,
let id = match req.data_request() {
DataRequest::Table(id) => id.to_owned(),
_ => panic!("Invalid request type"),
};
println!("Requesting data for table {:?}", id);
Expand All @@ -118,8 +118,8 @@ async fn load_run(client: &dyn StorageClient, requests: Vec<StorageRequest>) {
// Generate a load of requests for all tables at once
fn load_gen_allonce(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
let mut requests = Vec::new();
for table_id in table_ids {
requests.push(StorageRequest::Table(table_id));
for (req_id, table_id) in table_ids.into_iter().enumerate() {
requests.push(StorageRequest::new(req_id, DataRequest::Table(table_id)));
}
requests
}
Expand All @@ -130,12 +130,15 @@ fn load_gen_allonce(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
fn load_gen_skewed(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
// read a random table id twice, and a random table id zero times
let mut requests = Vec::new();
for table_id in &table_ids {
requests.push(StorageRequest::Table(table_id.clone()));
for (req_id, table_id) in table_ids.iter().enumerate() {
requests.push(StorageRequest::new(req_id, DataRequest::Table(*table_id)));
}
// remove last element
requests.pop();
requests.push(StorageRequest::Table(table_ids[0]));
requests.push(StorageRequest::new(
table_ids.len(),
DataRequest::Table(table_ids[0]),
));

requests
}
Expand All @@ -152,7 +155,7 @@ fn setup_clients(
let mut clients = Vec::new();
for i in 0..num_clients {
let client = Box::new(StorageClientImpl::new_for_test(
i as usize,
i,
table_file_map.clone(),
server_url,
false,
Expand Down
31 changes: 29 additions & 2 deletions client/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,40 @@ pub type TableId = u64;
pub type ColumnId = u64;
pub type RecordId = u64;

/// [`StorageRequest`] specifies the requests that the execution engine might issue to
/// Id type for the request. Should be unique among all requests.
pub type RequestId = usize;

/// [`StorageRequest`] is the request that the execution engine sends to the storage node.
#[derive(Clone)]
pub struct StorageRequest {
request_id: RequestId,
data_request: DataRequest,
}

impl StorageRequest {
pub fn new(request_id: RequestId, data_request: DataRequest) -> Self {
Self {
request_id,
data_request,
}
}

pub fn request_id(&self) -> RequestId {
self.request_id
}

pub fn data_request(&self) -> &DataRequest {
&self.data_request
}
}

/// [`DataRequest`] specifies the requests that the execution engine might issue to
/// the storage node.
///
/// Currently we assume the execution engine only requests the whole table/column. We may
/// add `std::ops::RangeBounds` later to support range query from the execution engine.
#[derive(Clone)]
pub enum StorageRequest {
pub enum DataRequest {
/// Requests a whole table from the underlying storage.
Table(TableId),
/// Requests one or more columns from the underlying storage.
Expand Down
69 changes: 39 additions & 30 deletions client/src/storage_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::path::Path;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;

use crate::client_api;
use crate::client_api::{self, DataRequest};

/// Clien for fetching data from I/O service
pub struct StorageClientImpl {
Expand Down Expand Up @@ -56,7 +56,7 @@ impl StorageClientImpl {
table_file_map: map,
server_url: server_url.to_string(),
local_cache: cache,
use_local_cache: use_local_cache,
use_local_cache,
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ impl StorageClientImpl {
let trimmed_path: Vec<&str> = file_path.split('/').collect();
let file_name = trimmed_path.last().ok_or_else(|| {
// Return an error if the last element does not exist
return anyhow::Error::msg("File path is empty");
anyhow::Error::msg("File path is empty")
})?;

let url = format!("{}/s3/{}", self.server_url, file_name);
Expand Down Expand Up @@ -193,11 +193,11 @@ impl StorageClientImpl {
Ok(())
}

async fn read_pqt_all(file_path: &String, sender: Sender<RecordBatch>) -> Result<()> {
async fn read_pqt_all(file_path: &str, sender: Sender<RecordBatch>) -> Result<()> {
// If the file exists, open it and read the data. Otherwise, call fetch_file to get the file
let mut local_path = StorageClientImpl::local_cache_path();

local_path.push_str(&file_path);
local_path.push_str(file_path);
let file = File::open(local_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let mut reader = builder.build()?;
Expand All @@ -207,12 +207,12 @@ impl StorageClientImpl {
Ok(())
}

async fn read_pqt_all_sync(file_path: &String) -> Result<Vec<RecordBatch>> {
async fn read_pqt_all_sync(file_path: &str) -> Result<Vec<RecordBatch>> {
// If the file exists, open it and read the data. Otherwise, call fetch_file to get the file
let mut local_path = StorageClientImpl::local_cache_path();
// print curr time
let start = std::time::Instant::now();
local_path.push_str(&file_path);
local_path.push_str(file_path);
print!(
"read_pqt_all_sync Reading from local_path: {:?}",
local_path
Expand All @@ -233,26 +233,26 @@ impl StorageClientImpl {

#[async_trait::async_trait]
impl StorageClient for StorageClientImpl {
async fn request_data(&self, _request: StorageRequest) -> Result<Receiver<RecordBatch>> {
match _request {
StorageRequest::Table(table_id) => self.read_entire_table(table_id).await,
async fn request_data(&self, request: StorageRequest) -> Result<Receiver<RecordBatch>> {
match request.data_request() {
DataRequest::Table(table_id) => self.read_entire_table(*table_id).await,

StorageRequest::Columns(_table_id, _column_ids) => {
DataRequest::Columns(_table_id, _column_ids) => {
unimplemented!("Column request is not supported yet")
}
StorageRequest::Tuple(_record_ids) => {
DataRequest::Tuple(_record_ids) => {
unimplemented!("Tuple request is not supported yet")
}
}
}

async fn request_data_sync(&self, _request: StorageRequest) -> Result<Vec<RecordBatch>> {
match _request {
StorageRequest::Table(table_id) => self.read_entire_table_sync(table_id).await,
StorageRequest::Columns(_table_id, _column_ids) => {
async fn request_data_sync(&self, request: StorageRequest) -> Result<Vec<RecordBatch>> {
match request.data_request() {
DataRequest::Table(table_id) => self.read_entire_table_sync(*table_id).await,
DataRequest::Columns(_table_id, _column_ids) => {
unimplemented!("Column request is not supported yet")
}
StorageRequest::Tuple(_record_ids) => {
DataRequest::Tuple(_record_ids) => {
unimplemented!("Tuple request is not supported yet")
}
}
Expand Down Expand Up @@ -280,17 +280,15 @@ mod tests {
Field::new("name", DataType::Utf8, false),
]);
let row_num = 10;
let ids_vec = (1..=row_num as i32).collect::<Vec<i32>>();
let ids_vec = (1..=row_num).collect::<Vec<i32>>();

// names is a vector of row_num names "testrow_{id}", each element is a &str
let names_vec = (1..=row_num)
.map(|id| format!("testrow_{}", id))
.collect::<Vec<String>>();
let ids = Int32Array::from(ids_vec);
let names = StringArray::from(names_vec);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(ids), Arc::new(names)]).unwrap();
batch
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(ids), Arc::new(names)]).unwrap()
}

fn create_sample_parquet_file(file_name: &str, row_num: usize) -> anyhow::Result<()> {
Expand All @@ -308,7 +306,7 @@ mod tests {
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let file = fs::File::create(&path).unwrap();
let file = fs::File::create(path).unwrap();

let props: WriterProperties = WriterProperties::builder().build();
let mut writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;
Expand Down Expand Up @@ -369,7 +367,7 @@ mod tests {
file_path.push_str(file_name);
table_file_map.insert(0, file_name.to_string());
// Create a sample parquet file
create_sample_parquet_file(file_name, 1000_000).unwrap();
create_sample_parquet_file(file_name, 1_000_000).unwrap();
(
StorageClientImpl::new_for_test(1, table_file_map, "http://localhost:26380", true),
file_name.to_string(),
Expand Down Expand Up @@ -413,7 +411,7 @@ mod tests {
let res = client.read_entire_table_sync(0).await;
assert!(res.is_ok());
let record_batch = res.unwrap();
let rb = record_batch.get(0);
let rb = record_batch.first();
assert!(rb.is_some());
let sample_rb = create_sample_rb();
assert_eq!(rb.unwrap().clone(), sample_rb);
Expand Down Expand Up @@ -448,7 +446,10 @@ mod tests {
let (client, _file_name) = setup_local();
let rt: Runtime = Runtime::new().unwrap();
rt.block_on(async {
let mut receiver = client.request_data(StorageRequest::Table(0)).await.unwrap();
let mut receiver = client
.request_data(StorageRequest::new(0, DataRequest::Table(0)))
.await
.unwrap();
// Wait for the channel to be ready
sleep(Duration::from_secs(1)).await;
// Assert that the channel is ready
Expand All @@ -467,7 +468,10 @@ mod tests {
let (client, _file_name) = setup_local_large();
let rt: Runtime = Runtime::new().unwrap();
rt.block_on(async {
let mut receiver = client.request_data(StorageRequest::Table(0)).await.unwrap();
let mut receiver = client
.request_data(StorageRequest::new(0, DataRequest::Table(0)))
.await
.unwrap();
// Wait for the channel to be ready
sleep(Duration::from_secs(1)).await;
// Assert that the channel is ready
Expand All @@ -476,7 +480,7 @@ mod tests {
total_num_rows += rb.num_rows();
}

assert_eq!(total_num_rows, 1000_000);
assert_eq!(total_num_rows, 1_000_000);
// println!("RecordBatch: {:?}", record_batch);
// println!("SampleRecordBatch: {:?}", sample_rb);
});
Expand All @@ -488,7 +492,10 @@ mod tests {
let client = setup_remote();
let rt = Runtime::new().unwrap();
rt.block_on(async {
let mut receiver = client.request_data(StorageRequest::Table(0)).await.unwrap();
let mut receiver = client
.request_data(StorageRequest::new(0, DataRequest::Table(0)))
.await
.unwrap();
// Wait for the channel to be ready
sleep(Duration::from_secs(1)).await;
// Assert that the channel is ready
Expand All @@ -507,10 +514,12 @@ mod tests {
let (client, _file_name) = setup_local();
let rt: Runtime = Runtime::new().unwrap();
rt.block_on(async {
let res = client.request_data_sync(StorageRequest::Table(0)).await;
let res = client
.request_data_sync(StorageRequest::new(0, DataRequest::Table(0)))
.await;
assert!(res.is_ok());
let record_batch = res.unwrap();
let rb = record_batch.get(0);
let rb = record_batch.first();
assert!(rb.is_some());
let sample_rb = create_sample_rb();
assert_eq!(rb.unwrap().clone(), sample_rb);
Expand Down

0 comments on commit 9a317a1

Please sign in to comment.