From 3316d8d89caad0e9190284875088e793485dec36 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 28 Feb 2025 12:11:50 -0800 Subject: [PATCH] address feedback --- kernel/Cargo.toml | 2 +- kernel/src/engine/default/json.rs | 21 ++++++++------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 2efb740af..46a34effe 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -150,6 +150,7 @@ rustc_version = "0.4.1" [dev-dependencies] delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] } test_utils = { path = "../test-utils" } +async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation paste = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } tempfile = "3" @@ -159,4 +160,3 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ "env-filter", "fmt", ] } -async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index b4301cf30..a3e15656b 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -129,8 +129,8 @@ impl JsonHandler for DefaultJsonHandler { let mut stream = stream::iter(file_futures) .buffered(buffer_size) .try_flatten() - .map_ok(|record_batch| { - Box::new(ArrowEngineData::new(record_batch)) as Box + .map_ok(|record_batch| -> Box { + Box::new(ArrowEngineData::new(record_batch)) }); // send each record batch over the channel @@ -303,7 +303,7 @@ mod tests { state: Mutex, } - #[derive(Debug, Default)] + #[derive(Debug)] struct KeysAndWakers { // Queue of paths in order which they will resolve ordered_keys: VecDeque, @@ -556,11 +556,7 @@ mod tests { let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap()); let data: Vec = handler - .read_json_files( - files, - Arc::new(physical_schema.clone().try_into().unwrap()), - None, - ) + .read_json_files(files, get_log_schema().clone(), None) .unwrap() .map_ok(into_record_batch) .try_collect() @@ -629,8 +625,9 @@ mod tests { future::join_all(handles).await; drop(tx); - // NB (from mpsc::IntoIter): This iterator will block whenever next is called, waiting for - // a new message, and None will be returned if the corresponding channel has hung up. + // NB (from mpsc::Receiver::recv): This function will always block the current thread if + // there is no data available and it's possible for more data to be sent (at least one + // sender still exists). let mut completed = Vec::new(); while let Ok(path) = rx.recv() { completed.push(path); @@ -717,9 +714,7 @@ mod tests { .iter() .flat_map(|batch| { let val_col: &Int32Array = batch.column(0).as_primitive(); - (0..val_col.len()) - .map(|i| val_col.value(i)) - .collect::>() + (0..val_col.len()).map(|i| val_col.value(i)).collect_vec() }) .collect(); assert_eq!(all_values, (0..1000).collect_vec());