From 5063f8c06c3270e0b614d3d7f2abf5eeff53eaa6 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 28 Feb 2025 11:49:31 -0800 Subject: [PATCH] revert small changes --- kernel/src/engine/default/json.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 36ed67933..b4301cf30 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -10,7 +10,7 @@ use crate::arrow::json::ReaderBuilder; use crate::arrow::record_batch::RecordBatch; use bytes::{Buf, Bytes}; use futures::stream::{self, BoxStream}; -use futures::{StreamExt, TryStreamExt}; +use futures::{ready, StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; use tracing::warn; @@ -204,8 +204,7 @@ impl JsonOpener { let batch_size = self.batch_size; let path = Path::from_url_path(file_meta.location.path())?; - let get_result = store.get(&path).await?; - match get_result.payload { + match store.get(&path).await?.payload { GetResultPayload::File(file, _) => { let reader = ReaderBuilder::new(schema) .with_batch_size(batch_size) @@ -220,10 +219,10 @@ impl JsonOpener { let mut input = s.map_err(Error::from); let mut buffered = Bytes::new(); - let stream = futures::stream::poll_fn(move |cx| { + let s = futures::stream::poll_fn(move |cx| { loop { if buffered.is_empty() { - buffered = match futures::ready!(input.poll_next_unpin(cx)) { + buffered = match ready!(input.poll_next_unpin(cx)) { Some(Ok(b)) => b, Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => break, @@ -250,7 +249,7 @@ impl JsonOpener { Poll::Ready(decoder.flush().map_err(Error::from).transpose()) }); - Ok(stream.map_err(Error::from).boxed()) + Ok(s.map_err(Error::from).boxed()) } } }