diff --git a/Cargo.toml b/Cargo.toml index ff231178a2b3..ec8e6621c2a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ exclude = ["datafusion-cli"] members = [ "datafusion/common", + "datafusion/common_runtime", "datafusion/core", "datafusion/expr", "datafusion/execution", @@ -72,6 +73,7 @@ ctor = "0.2.0" dashmap = "5.4.0" datafusion = { path = "datafusion/core", version = "36.0.0", default-features = false } datafusion-common = { path = "datafusion/common", version = "36.0.0", default-features = false } +datafusion-common-runtime = { path = "datafusion/common_runtime", version = "36.0.0" } datafusion-execution = { path = "datafusion/execution", version = "36.0.0" } datafusion-expr = { path = "datafusion/expr", version = "36.0.0" } datafusion-functions = { path = "datafusion/functions", version = "36.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 2379a30ce10f..40f9bd9dd4b5 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1112,6 +1112,7 @@ dependencies = [ "chrono", "dashmap", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", "datafusion-functions", @@ -1193,6 +1194,13 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-common-runtime" +version = "36.0.0" +dependencies = [ + "tokio", +] + [[package]] name = "datafusion-execution" version = "36.0.0" @@ -1316,6 +1324,7 @@ dependencies = [ "async-trait", "chrono", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", diff --git a/datafusion/common_runtime/Cargo.toml b/datafusion/common_runtime/Cargo.toml new file mode 100644 index 000000000000..7ed8b2cf2975 --- /dev/null +++ b/datafusion/common_runtime/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-common-runtime" +description = "Common Runtime functionality for DataFusion query engine" +keywords = ["arrow", "query", "sql"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lib] +name = "datafusion_common_runtime" +path = "src/lib.rs" + +[dependencies] +tokio = { workspace = true } diff --git a/datafusion/common_runtime/README.md b/datafusion/common_runtime/README.md new file mode 100644 index 000000000000..77100e52603c --- /dev/null +++ b/datafusion/common_runtime/README.md @@ -0,0 +1,26 @@ + + +# DataFusion Common Runtime + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion that provides common utilities. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/common_runtime/src/common.rs b/datafusion/common_runtime/src/common.rs new file mode 100644 index 000000000000..88b74448c7a8 --- /dev/null +++ b/datafusion/common_runtime/src/common.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::future::Future; + +use tokio::task::{JoinError, JoinSet}; + +/// Helper that provides a simple API to spawn a single task and join it. +/// Provides guarantees of aborting on `Drop` to keep it cancel-safe. +/// +/// Technically, it's just a wrapper of `JoinSet` (with size=1). +#[derive(Debug)] +pub struct SpawnedTask { + inner: JoinSet, +} + +impl SpawnedTask { + pub fn spawn(task: T) -> Self + where + T: Future, + T: Send + 'static, + R: Send, + { + let mut inner = JoinSet::new(); + inner.spawn(task); + Self { inner } + } + + pub fn spawn_blocking(task: T) -> Self + where + T: FnOnce() -> R, + T: Send + 'static, + R: Send, + { + let mut inner = JoinSet::new(); + inner.spawn_blocking(task); + Self { inner } + } + + pub async fn join(mut self) -> Result { + self.inner + .join_next() + .await + .expect("`SpawnedTask` instance always contains exactly 1 task") + } +} diff --git a/datafusion/common_runtime/src/lib.rs b/datafusion/common_runtime/src/lib.rs new file mode 100644 index 000000000000..e8624163f224 --- /dev/null +++ b/datafusion/common_runtime/src/lib.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod common; + +pub use common::SpawnedTask; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 662d95a9323c..0c378d9d83f5 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -89,6 +89,7 @@ bzip2 = { version = "0.4.3", optional = true } chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index d7c31b9bd6b3..3bdf2af4552d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1510,6 +1510,7 @@ mod tests { use arrow::array::{self, Int32Array}; use arrow::datatypes::DataType; use datafusion_common::{Constraint, Constraints}; + use datafusion_common_runtime::SpawnedTask; use datafusion_expr::{ avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum, BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame, @@ -2169,15 +2170,14 @@ mod tests { } #[tokio::test] - #[allow(clippy::disallowed_methods)] async fn sendable() { let df = test_table().await.unwrap(); // dataframes should be sendable between threads/tasks - let task = tokio::task::spawn(async move { + let task = SpawnedTask::spawn(async move { df.select_columns(&["c1"]) .expect("should be usable in a task") }); - task.await.expect("task completed successfully"); + task.join().await.expect("task completed successfully"); } #[tokio::test] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 739850115370..4ea6c2a273f1 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -40,9 +40,9 @@ use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use bytes::{BufMut, BytesMut}; use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use datafusion_physical_plan::common::SpawnedTask; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index d70b4811da5b..396da96332f6 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -33,7 +33,7 @@ use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArr use arrow_schema::{DataType, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::{exec_datafusion_err, DataFusionError}; - +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use futures::StreamExt; @@ -41,7 +41,6 @@ use object_store::path::Path; use rand::distributions::DistString; -use datafusion_physical_plan::common::SpawnedTask; use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; type RecordBatchReceiver = Receiver; diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 05406d3751c9..dd0e5ce6a40e 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -30,10 +30,10 @@ use crate::physical_plan::SendableRecordBatchStream; use arrow_array::RecordBatch; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use bytes::Bytes; -use datafusion_physical_plan::common::SpawnedTask; use futures::try_join; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 6dc59e4a5c65..0d91b1cba34d 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -31,9 +31,9 @@ use async_trait::async_trait; use futures::StreamExt; use datafusion_common::{plan_err, Constraints, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_plan::common::SpawnedTask; use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3aa4edfe3adc..2d964d29688c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2222,6 +2222,7 @@ mod tests { use crate::test_util::{plan_and_collect, populate_csv_partitions}; use crate::variable::VarType; use async_trait::async_trait; + use datafusion_common_runtime::SpawnedTask; use datafusion_expr::Expr; use std::env; use std::path::PathBuf; @@ -2321,7 +2322,6 @@ mod tests { } #[tokio::test] - #[allow(clippy::disallowed_methods)] async fn send_context_to_threads() -> Result<()> { // ensure SessionContexts can be used in a multi-threaded // environment. Usecase is for concurrent planing. @@ -2332,7 +2332,7 @@ mod tests { let threads: Vec<_> = (0..2) .map(|_| ctx.clone()) .map(|ctx| { - tokio::spawn(async move { + SpawnedTask::spawn(async move { // Ensure we can create logical plan code on a separate thread. ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") .await @@ -2341,7 +2341,7 @@ mod tests { .collect(); for handle in threads { - handle.await.unwrap().unwrap(); + handle.join().await.unwrap().unwrap(); } Ok(()) } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index d78d7a38a1c3..3fdb897ce1d1 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -480,6 +480,11 @@ pub use parquet; /// re-export of [`datafusion_common`] crate pub mod common { pub use datafusion_common::*; + + /// re-export of [`datafusion_common_runtime`] crate + pub mod runtime { + pub use datafusion_common_runtime::*; + } } // Backwards compatibility diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 1cab4d5c2f98..ee5e34bd703f 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -30,6 +30,7 @@ use datafusion::physical_plan::windows::{ use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{Result, ScalarValue}; +use datafusion_common_runtime::SpawnedTask; use datafusion_expr::type_coercion::aggregates::coerce_types; use datafusion_expr::{ AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound, @@ -123,8 +124,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> { for i in 0..n { let idx = i % test_cases.len(); let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone(); - #[allow(clippy::disallowed_methods)] // spawn allowed only in tests - let job = tokio::spawn(run_window_test( + let job = SpawnedTask::spawn(run_window_test( make_staggered_batches::(1000, n_distinct, i as u64), i as u64, pb_cols, @@ -134,7 +134,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> { handles.push(job); } for job in handles { - job.await.unwrap()?; + job.join().await.unwrap()?; } } Ok(()) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b4621109d2b1..72ee4fb3ef7e 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -43,6 +43,7 @@ arrow-schema = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } +datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 47cdf3e400e3..656bffd4a799 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -36,9 +36,8 @@ use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use futures::{Future, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use parking_lot::Mutex; -use tokio::task::{JoinError, JoinSet}; /// [`MemoryReservation`] used across query execution streams pub(crate) type SharedMemoryReservation = Arc>; @@ -172,46 +171,6 @@ pub fn compute_record_batch_statistics( } } -/// Helper that provides a simple API to spawn a single task and join it. -/// Provides guarantees of aborting on `Drop` to keep it cancel-safe. -/// -/// Technically, it's just a wrapper of `JoinSet` (with size=1). -#[derive(Debug)] -pub struct SpawnedTask { - inner: JoinSet, -} - -impl SpawnedTask { - pub fn spawn(task: T) -> Self - where - T: Future, - T: Send + 'static, - R: Send, - { - let mut inner = JoinSet::new(); - inner.spawn(task); - Self { inner } - } - - pub fn spawn_blocking(task: T) -> Self - where - T: FnOnce() -> R, - T: Send + 'static, - R: Send, - { - let mut inner = JoinSet::new(); - inner.spawn_blocking(task); - Self { inner } - } - - pub async fn join(mut self) -> Result { - self.inner - .join_next() - .await - .expect("`SpawnedTask` instance always contains exactly 1 task") - } -} - /// Transposes the given vector of vectors. pub fn transpose(original: Vec>) -> Vec> { match original.as_slice() { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 006cd646b0ca..b527466493a8 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -264,7 +264,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// [`spawn`]: tokio::task::spawn /// [`JoinSet`]: tokio::task::JoinSet - /// [`SpawnedTask`]: crate::common::SpawnedTask + /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder /// /// # Implementation Examples diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index fe93ea131506..7ac70949f893 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -29,7 +29,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use crate::common::{transpose, SpawnedTask}; +use crate::common::transpose; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::repartition::distributor_channels::{ @@ -42,6 +42,7 @@ use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; @@ -946,7 +947,6 @@ mod tests { use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::FutureExt; - use tokio::task::JoinHandle; #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { @@ -1060,10 +1060,9 @@ mod tests { } #[tokio::test] - #[allow(clippy::disallowed_methods)] async fn many_to_many_round_robin_within_tokio_task() -> Result<()> { - let join_handle: JoinHandle>>> = - tokio::spawn(async move { + let handle: SpawnedTask>>> = + SpawnedTask::spawn(async move { // define input partitions let schema = test_schema(); let partition = create_vec_batches(50); @@ -1074,7 +1073,7 @@ mod tests { repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await }); - let output_partitions = join_handle.await.unwrap().unwrap(); + let output_partitions = handle.join().await.unwrap().unwrap(); assert_eq!(5, output_partitions.len()); assert_eq!(30, output_partitions[0].len()); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f46958663252..5b0f2f354824 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::common::{spawn_buffered, IPCWriter, SpawnedTask}; +use crate::common::{spawn_buffered, IPCWriter}; use crate::expressions::PhysicalSortExpr; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, @@ -46,6 +46,7 @@ use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 12c4c96d5236..c348f2cddc93 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -39,6 +39,7 @@ chrono = { workspace = true, optional = true } clap = { version = "4.4.8", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } +datafusion-common-runtime = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } itertools = { workspace = true } diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 41c33deec643..268d09681c72 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -28,6 +28,7 @@ use log::info; use sqllogictest::strict_column_validator; use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; const TEST_DIRECTORY: &str = "test_files/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; @@ -88,8 +89,7 @@ async fn run_tests() -> Result<()> { // modifying shared state like `/tmp/`) let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?) .map(|test_file| { - #[allow(clippy::disallowed_methods)] // spawn allowed only in tests - tokio::task::spawn(async move { + SpawnedTask::spawn(async move { println!("Running {:?}", test_file.relative_path); if options.complete { run_complete_file(test_file).await?; @@ -100,6 +100,7 @@ async fn run_tests() -> Result<()> { } Ok(()) as Result<()> }) + .join() }) // run up to num_cpus streams in parallel .buffer_unordered(num_cpus::get())