Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 28, 2025
1 parent 5063f8c commit 3316d8d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
2 changes: 1 addition & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ rustc_version = "0.4.1"
[dev-dependencies]
delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] }
test_utils = { path = "../test-utils" }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
paste = "1.0"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tempfile = "3"
Expand All @@ -159,4 +160,3 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
] }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
21 changes: 8 additions & 13 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
let mut stream = stream::iter(file_futures)
.buffered(buffer_size)
.try_flatten()
.map_ok(|record_batch| {
Box::new(ArrowEngineData::new(record_batch)) as Box<dyn EngineData>
.map_ok(|record_batch| -> Box<dyn EngineData> {
Box::new(ArrowEngineData::new(record_batch))
});

// send each record batch over the channel
Expand Down Expand Up @@ -303,7 +303,7 @@ mod tests {
state: Mutex<KeysAndWakers>,
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct KeysAndWakers {
// Queue of paths in order which they will resolve
ordered_keys: VecDeque<Path>,
Expand Down Expand Up @@ -556,11 +556,7 @@ mod tests {
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(
files,
Arc::new(physical_schema.clone().try_into().unwrap()),
None,
)
.read_json_files(files, get_log_schema().clone(), None)
.unwrap()
.map_ok(into_record_batch)
.try_collect()
Expand Down Expand Up @@ -629,8 +625,9 @@ mod tests {
future::join_all(handles).await;
drop(tx);

// NB (from mpsc::IntoIter): This iterator will block whenever next is called, waiting for
// a new message, and None will be returned if the corresponding channel has hung up.
// NB (from mpsc::Receiver::recv): This function will always block the current thread if
// there is no data available and it's possible for more data to be sent (at least one
// sender still exists).
let mut completed = Vec::new();
while let Ok(path) = rx.recv() {
completed.push(path);
Expand Down Expand Up @@ -717,9 +714,7 @@ mod tests {
.iter()
.flat_map(|batch| {
let val_col: &Int32Array = batch.column(0).as_primitive();
(0..val_col.len())
.map(|i| val_col.value(i))
.collect::<Vec<_>>()
(0..val_col.len()).map(|i| val_col.value(i)).collect_vec()
})
.collect();
assert_eq!(all_values, (0..1000).collect_vec());
Expand Down

0 comments on commit 3316d8d

Please sign in to comment.