diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 65c4863d2..62b7846e8 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -13,6 +13,7 @@ use futures::stream::{self, BoxStream}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; +use tracing::warn; use url::Url; use super::executor::TaskExecutor; @@ -91,8 +92,10 @@ impl JsonHandler for DefaultJsonHandler { let readahead = self.readahead; self.task_executor.spawn(async move { + // an iterator of futures that open each file let file_futures = files.into_iter().map(|file| file_opener.open(file, None)); + // create a stream from that iterator which buffers up to `readahead` futures at a time let mut stream = stream::iter(file_futures) .buffered(readahead) .try_flatten() @@ -100,9 +103,11 @@ impl JsonHandler for DefaultJsonHandler { Box::new(ArrowEngineData::new(record_batch)) as Box }); + // send each record batch over the channel while let Some(item) = stream.next().await { - // check err? - let _ = tx.send(item); + if tx.send(item).is_err() { + warn!("read_json receiver end of channel dropped before sending completed"); + } } });