diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a1d3378181c28..99ca7ac165236 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -32,7 +32,8 @@ use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, ExecutionPlan, PhysicalExpr}; +use crate::spill::read_spill_as_stream; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index dc736993a4533..7311755425f35 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -21,31 +21,41 @@ use std::any::Any; use std::fmt::Debug; -use std::fs::File; -use std::io::BufReader; -use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::coalesce_partitions::CoalescePartitionsExec; -use crate::display::DisplayableExecutionPlan; -use crate::metrics::MetricsSet; -use crate::repartition::RepartitionExec; -use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; - use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; +use futures::stream::{StreamExt, TryStreamExt}; +use tokio::task::JoinSet; + +use datafusion_common::{exec_err, Result}; +pub use datafusion_common::{ColumnStatistics, internal_err, Statistics}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{exec_datafusion_err, exec_err, Result}; +pub use datafusion_common::hash_utils; +pub use datafusion_common::utils::project_schema; +pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; use datafusion_execution::TaskContext; +pub use datafusion_expr::{Accumulator, ColumnarValue}; use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, }; +pub use datafusion_physical_expr::{ + AggregateExpr, Distribution, expressions, functions, Partitioning, PhysicalExpr, udf, +}; +pub use datafusion_physical_expr::window::WindowExpr; -use futures::stream::{StreamExt, TryStreamExt}; -use log::debug; -use tokio::sync::mpsc::Sender; -use tokio::task::JoinSet; +use crate::coalesce_partitions::CoalescePartitionsExec; +pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +use crate::display::DisplayableExecutionPlan; +pub use crate::metrics::Metric; +use crate::metrics::MetricsSet; +pub use crate::ordering::InputOrderMode; +use crate::repartition::RepartitionExec; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; +pub use crate::stream::EmptyRecordBatchStream; +use crate::stream::RecordBatchStreamAdapter; +pub use crate::topk::TopK; +pub use crate::visitor::{accept, ExecutionPlanVisitor, visit_execution_plan}; mod ordering; mod topk; @@ -70,6 +80,7 @@ pub mod projection; pub mod recursive_query; pub mod repartition; pub mod sorts; +pub mod spill; pub mod stream; pub mod streaming; pub mod tree_node; @@ -79,32 +90,9 @@ pub mod values; pub mod windows; pub mod work_table; -pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -pub use crate::metrics::Metric; -pub use crate::ordering::InputOrderMode; -pub use crate::topk::TopK; -pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; - -pub use datafusion_common::hash_utils; -pub use datafusion_common::utils::project_schema; -pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; -pub use datafusion_expr::{Accumulator, ColumnarValue}; -pub use datafusion_physical_expr::window::WindowExpr; -pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, -}; - -// Backwards compatibility -use crate::common::IPCWriter; -pub use crate::stream::EmptyRecordBatchStream; -use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; -use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::human_readable_size; -pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; - pub mod udaf { pub use datafusion_physical_expr_common::aggregate::{ - create_aggregate_expr, AggregateFunctionExpr, + AggregateFunctionExpr, create_aggregate_expr, }; } @@ -903,56 +891,6 @@ pub fn get_plan_string(plan: &Arc) -> Vec { actual.iter().map(|elem| elem.to_string()).collect() } -/// Read spilled batches from the disk -/// -/// `path` - temp file -/// `schema` - batches schema, should be the same across batches -/// `buffer` - internal buffer of capacity batches -pub fn read_spill_as_stream( - path: RefCountedTempFile, - schema: SchemaRef, - buffer: usize, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, buffer); - let sender = builder.tx(); - - builder.spawn_blocking(move || read_spill(sender, path.path())); - - Ok(builder.build()) -} - -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. -pub fn spill_record_batches( - batches: Vec, - path: PathBuf, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(&batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok(writer.num_rows) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} #[cfg(test)] mod tests { @@ -960,6 +898,7 @@ mod tests { use std::sync::Arc; use arrow_schema::{Schema, SchemaRef}; + use datafusion_common::{Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f347a0f5b6d56..e1588b587824b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -33,11 +33,12 @@ use crate::sorts::streaming_merge::streaming_merge; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ - read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType, + DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }; +use crate::spill::{read_spill_as_stream, spill_record_batches}; use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn}; use arrow::datatypes::SchemaRef; diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs new file mode 100644 index 0000000000000..be14c148e27ee --- /dev/null +++ b/datafusion/physical-plan/src/spill.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the spilling functions + +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; + +use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::FileReader; +use arrow::record_batch::RecordBatch; +use log::debug; +use tokio::sync::mpsc::Sender; + +use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::memory_pool::human_readable_size; + +use crate::common::IPCWriter; +use crate::stream::RecordBatchReceiverStream; + +/// Read spilled batches from the disk +/// +/// `path` - temp file +/// `schema` - batches schema, should be the same across batches +/// `buffer` - internal buffer of capacity batches +pub fn read_spill_as_stream( + path: RefCountedTempFile, + schema: SchemaRef, + buffer: usize, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, buffer); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, path.path())); + + Ok(builder.build()) +} + +/// Spills in-memory `batches` to disk. +/// +/// Returns total number of the rows spilled to disk. +pub fn spill_record_batches( + batches: Vec, + path: PathBuf, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + for batch in batches { + writer.write(&batch)?; + } + writer.finish()?; + debug!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, + writer.num_rows, + human_readable_size(writer.num_bytes), + ); + Ok(writer.num_rows) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +}