Skip to content

Commit

Permalink
comments and add warn for error
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 25, 2025
1 parent 494a470 commit ba09853
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::stream::{self, BoxStream};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};
use tracing::warn;
use url::Url;

use super::executor::TaskExecutor;
Expand Down Expand Up @@ -91,18 +92,22 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
let readahead = self.readahead;

self.task_executor.spawn(async move {
// an iterator of futures that open each file
let file_futures = files.into_iter().map(|file| file_opener.open(file, None));

// create a stream from that iterator which buffers up to `readahead` futures at a time
let mut stream = stream::iter(file_futures)
.buffered(readahead)
.try_flatten()
.map_ok(|record_batch| {
Box::new(ArrowEngineData::new(record_batch)) as Box<dyn EngineData>
});

// send each record batch over the channel
while let Some(item) = stream.next().await {
// check err?
let _ = tx.send(item);
if tx.send(item).is_err() {
warn!("read_json receiver end of channel dropped before sending completed");
}
}
});

Expand Down

0 comments on commit ba09853

Please sign in to comment.