Skip to content

Commit

Permalink
refactor: use builder pattern to create ArrowReader
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Mar 4, 2024
1 parent 4377cdc commit c3631dd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
57 changes: 41 additions & 16 deletions crates/iceberg/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,39 @@ use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::SchemaRef;

/// Default arrow record batch size
const DEFAULT_BATCH_SIZE: usize = 1024;
/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
schema: SchemaRef,
}

impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
file_io,
schema,
}
}

/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
schema: self.schema,
file_io: self.file_io,
}
}
}

/// Reads data from Parquet files
pub struct ArrowReader {
Expand All @@ -37,20 +68,10 @@ pub struct ArrowReader {
}

impl ArrowReader {
/// Constructs a new ArrowReader
pub fn new(file_io: FileIO, schema: SchemaRef, batch_size: Option<usize>) -> Self {
ArrowReader {
batch_size,
file_io,
schema,
}
}

/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
Expand All @@ -62,11 +83,15 @@ impl ArrowReader {
.reader()
.await?;

let mut batch_stream = ParquetRecordBatchStreamBuilder::new(parquet_reader)
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
.await?
.with_batch_size(batch_size)
.with_projection(projection_mask)
.build()?;
.with_projection(projection_mask);

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?;
Expand Down
12 changes: 9 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Table scan api.
use crate::arrow::ArrowReader;
use crate::arrow::ArrowReaderBuilder;
use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
use crate::table::Table;
Expand Down Expand Up @@ -177,8 +177,14 @@ impl TableScan {
}

pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
ArrowReader::new(self.file_io.clone(), self.schema.clone(), self.batch_size)
.read(self.plan_files().await?)
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}

arrow_reader_builder.build().read(self.plan_files().await?)
}
}

Expand Down

0 comments on commit c3631dd

Please sign in to comment.