diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 4ab429c9c..65c4863d2 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -217,12 +217,21 @@ impl JsonOpener { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::path::PathBuf; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; use crate::arrow::array::{AsArray, RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use futures::future; use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; + use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOpts, + PutOptions, PutPayload, PutResult, Result, + }; use super::*; use crate::{ @@ -230,16 +239,6 @@ mod tests { engine::default::executor::tokio::TokioBackgroundExecutor, }; - use std::collections::HashMap; - use std::sync::Arc; - use std::sync::Mutex; - use std::time::Duration; - - use object_store::{ - GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOpts, - PutOptions, PutPayload, PutResult, Result, - }; - /// Store wrapper that wraps an inner store to purposefully delay GET requests of certain keys. #[derive(Debug)] struct SlowGetStore { @@ -455,9 +454,9 @@ mod tests { ]; let paths = paths.map(|p| std::fs::canonicalize(PathBuf::from(p)).unwrap()); - // Convert &std::path::Path to object_store::path::Path let path_string = paths[0].to_string_lossy().to_string(); let object_store_path = Path::from(path_string); + // for the first 000000.json, wait for 1 second let key_map = (object_store_path, Duration::from_secs(1)); let store = Arc::new(SlowGetStore::new( @@ -465,9 +464,6 @@ mod tests { vec![key_map].into_iter().collect(), )); - // Create a vector of futures, then await them all - use futures::future; - let file_futures: Vec<_> = paths .iter() .map(|path| { @@ -484,11 +480,8 @@ mod tests { } }) .collect(); - - // Await all futures to get a Vec let files = future::join_all(file_futures).await; - // Same as before, but now files is a Vec let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap()); let data: Vec = handler