Skip to content

Commit

Permalink
add small buffer test
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 28, 2025
1 parent 2cff468 commit 751e582
Showing 1 changed file with 87 additions and 65 deletions.
152 changes: 87 additions & 65 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ mod tests {
}

impl<T: ObjectStore> OrderedGetStore<T> {
fn new(inner: T, ordered_keys: impl Into<VecDeque<Path>>) -> Self {
let ordered_keys = ordered_keys.into();
fn new(inner: T, ordered_keys: &[Path]) -> Self {
let ordered_keys: Vec<Path> = ordered_keys.to_vec();
// Check for duplicates
let mut seen = HashSet::new();
for key in ordered_keys.iter() {
Expand All @@ -323,7 +323,7 @@ mod tests {
}

let state = KeysAndWakers {
ordered_keys,
ordered_keys: ordered_keys.into(),
wakers: HashMap::new(),
};

Expand Down Expand Up @@ -603,10 +603,7 @@ mod tests {
}

// Create ordered store with natural order (0, 1, 2, ...)
let ordered_store = Arc::new(OrderedGetStore::new(
memory_store.fork(),
ordered_paths.clone(),
));
let ordered_store = Arc::new(OrderedGetStore::new(memory_store, &ordered_paths));

let (tx, rx) = mpsc::channel();

Expand Down Expand Up @@ -648,76 +645,101 @@ mod tests {
// 1. we set up a list of FileMetas (and some random JSON content) in order
// 2. we then set up an ObjectStore to resolves those paths in a jumbled order
// 3. then call read_json_files and check that the results are in order
//
// note we don't want to go over 1000 since we only buffer 1000 requests at a time
let ordered_paths: Vec<Path> = (0..1000)
.map(|i| Path::from(format!("test/path{}", i)))
.collect();
let jumbled_paths: Vec<_> = ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect();

let test_list: &[(usize, Vec<Path>)] = &[
// test 1: buffer_size = 1000, just 1000 jumbled paths
(
1000, // buffer_size
ordered_paths[100..400]
.iter()
.chain(ordered_paths[400..].iter().rev())
.chain(ordered_paths[..100].iter())
.cloned()
.collect(),
),
// test 2: buffer_size = 4, jumbled paths in groups of 4
(
4, // buffer_size
(0..250)
.map(|i| {
[
ordered_paths[1 + 4 * i].clone(),
ordered_paths[4 * i].clone(),
ordered_paths[3 + 4 * i].clone(),
ordered_paths[2 + 4 * i].clone(),
]
})
.flatten()
.collect_vec(),
),
];

let memory_store = InMemory::new();
for (i, path) in ordered_paths.iter().enumerate() {
memory_store
.put(path, Bytes::from(format!("{{\"val\": {i}}}")).into())
.await
.unwrap();
}
// set up our ObjectStore to resolve paths in a jumbled order
let store = Arc::new(OrderedGetStore::new(memory_store, jumbled_paths));

// convert the paths to FileMeta
let ordered_file_meta: Vec<_> = ordered_paths
.iter()
.map(|path| {
let store = store.clone();
async move {
let url = Url::parse(&format!("memory:/{}", path)).unwrap();
let location = Path::from(path.as_ref());
let meta = store.head(&location).await.unwrap();
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
for (buffer_size, jumbled_paths) in test_list {
// set up our ObjectStore to resolve paths in a jumbled order
let store = Arc::new(OrderedGetStore::new(memory_store.fork(), jumbled_paths));

// convert the paths to FileMeta
let ordered_file_meta: Vec<_> = ordered_paths
.iter()
.map(|path| {
let store = store.clone();
async move {
let url = Url::parse(&format!("memory:/{}", path)).unwrap();
let location = Path::from(path.as_ref());
let meta = store.head(&location).await.unwrap();
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
}
}
}
})
.collect();

// note: join_all is ordered
let files = future::join_all(ordered_file_meta).await;

// fire off the read_json_files call (for all the files in order)
let handler = DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)),
);
let schema = Arc::new(ArrowSchema::new(vec![Arc::new(Field::new(
"val",
DataType::Int32,
true,
))]));
let physical_schema = Arc::new(schema.try_into().unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(&files, physical_schema, None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();
})
.collect();

// note: join_all is ordered
let files = future::join_all(ordered_file_meta).await;

// fire off the read_json_files call (for all the files in order)
let handler = DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
)),
);
let handler = handler.with_buffer_size(*buffer_size);
let schema = Arc::new(ArrowSchema::new(vec![Arc::new(Field::new(
"val",
DataType::Int32,
true,
))]));
let physical_schema = Arc::new(schema.try_into().unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(&files, physical_schema, None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
.unwrap();

// check the order
let all_values: Vec<i32> = data
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
// check the order
let all_values: Vec<i32> = data
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
}
}
}

0 comments on commit 751e582

Please sign in to comment.