From e8c8e208c1f88b49b6f470683849a22d0d6044bd Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 19 Nov 2024 09:12:49 -0500 Subject: [PATCH] Add `DedicatedExecutor` to FlightSQL Server (#247) Add's a dedicated executor for running CPU bound work on the FlightSQL server. There is interest from the [DataFusion community](https://github.com/apache/datafusion/issues/13274#issuecomment-2468866602) for this, it was already on our [roadmap](https://github.com/datafusion-contrib/datafusion-dft/issues/197) and I think the DFT FlightSQL server is a great place to have a reference implementation. Initial inspiration and context can be found [here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/). Most of the initial implementation was copied from [here](https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/executor/src/lib.rs) with some tweaks for our current setup. In particular we dont have metrics yet in the FlightSQL server implementation (but it is on the [roadmap](https://github.com/datafusion-contrib/datafusion-dft/issues/210)) - I expect to do a follow on where metrics are integrated. --- Cargo.lock | 2 + Cargo.toml | 2 + f.csv | 2 - src/args.rs | 2 +- src/config.rs | 18 + src/execution/executor/dedicated.rs | 785 ++++++++++++++++++++++++++++ src/execution/executor/io.rs | 131 +++++ src/execution/executor/mod.rs | 22 + src/execution/local.rs | 57 ++ src/execution/mod.rs | 1 + src/flightsql_server/mod.rs | 108 ++-- src/main.rs | 49 +- src/tui/mod.rs | 1 + 13 files changed, 1100 insertions(+), 80 deletions(-) delete mode 100644 f.csv create mode 100644 src/execution/executor/dedicated.rs create mode 100644 src/execution/executor/io.rs create mode 100644 src/execution/executor/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 88e6964..403350e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1598,7 +1598,9 @@ dependencies = [ "itertools 0.13.0", "lazy_static", "log", + "num_cpus", "object_store", + "parking_lot", "pin-project-lite", "predicates", "prost", diff --git a/Cargo.toml b/Cargo.toml index 6547050..c2c706d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,9 @@ http-body = {version = "0.4.5" } itertools = "0.13.0" lazy_static = "1.4.0" log = "0.4.22" +num_cpus = "1.16.0" object_store = { version = "0.10.2", features = ["aws"], optional = true } +parking_lot = "0.12.3" pin-project-lite = {version = "0.2.14" } prost = "0.12.3" ratatui = "0.28.0" diff --git a/f.csv b/f.csv deleted file mode 100644 index ec868dc..0000000 --- a/f.csv +++ /dev/null @@ -1,2 +0,0 @@ -query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total -SELECT 1,10,7,28,11,9,33.44,9,127,22,10,66.29,9,127,22,10,66.43,17,139,33,19,100.00 diff --git a/src/args.rs b/src/args.rs index 997a75f..7d96c7a 100644 --- a/src/args.rs +++ b/src/args.rs @@ -91,7 +91,7 @@ pub struct DftArgs { #[clap(short = 'n', help = "Set the number of benchmark iterations to run")] pub benchmark_iterations: Option, - #[cfg(feature = "flightsql")] + #[cfg(any(feature = "flightsql", feature = "experimental-flightsql-server"))] #[clap(long, help = "Set the host and port to be used for FlightSQL")] pub flightsql_host: Option, } diff --git a/src/config.rs b/src/config.rs index b4bc72a..2b68486 100644 --- a/src/config.rs +++ b/src/config.rs @@ -172,6 +172,10 @@ pub struct ExecutionConfig { pub tui_batch_size: usize, #[serde(default = "default_flightsql_server_batch_size")] pub flightsql_server_batch_size: usize, + #[serde(default = "default_dedicated_executor_enabled")] + pub dedicated_executor_enabled: bool, + #[serde(default = "default_dedicated_executor_threads")] + pub dedicated_executor_threads: usize, } fn default_ddl_path() -> Option { @@ -204,6 +208,18 @@ fn default_flightsql_server_batch_size() -> usize { 8092 } +fn default_dedicated_executor_enabled() -> bool { + false +} + +fn default_dedicated_executor_threads() -> usize { + // By default we slightly over provision CPUs. For example, if you have N CPUs available we + // have N CPUs for the [`DedicatedExecutor`] and 1 for the main / IO runtime. + // + // Ref: https://github.com/datafusion-contrib/datafusion-dft/pull/247#discussion_r1848270250 + num_cpus::get() +} + impl Default for ExecutionConfig { fn default() -> Self { Self { @@ -213,6 +229,8 @@ impl Default for ExecutionConfig { cli_batch_size: default_cli_batch_size(), tui_batch_size: default_tui_batch_size(), flightsql_server_batch_size: default_flightsql_server_batch_size(), + dedicated_executor_enabled: default_dedicated_executor_enabled(), + dedicated_executor_threads: default_dedicated_executor_threads(), } } } diff --git a/src/execution/executor/dedicated.rs b/src/execution/executor/dedicated.rs new file mode 100644 index 0000000..92f3136 --- /dev/null +++ b/src/execution/executor/dedicated.rs @@ -0,0 +1,785 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::Display, + sync::{Arc, OnceLock}, + time::Duration, +}; + +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, TryFutureExt, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +use crate::config::ExecutionConfig; + +use super::io::register_io_runtime; + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Errors occurring when polling [`DedicatedExecutor::spawn`]. +#[derive(Debug)] +#[allow(missing_docs)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +/// Manages a separate tokio runtime (thread pool) for executing tasks. +/// +/// A `DedicatedExecutor` runs futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio Executor +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'plan::stringset::tests::test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed. This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc>, + + /// Used for testing. + /// + /// This will ignore explicit shutdown requests. + testing: bool, +} + +/// [`DedicatedExecutor`] for testing purposes. +static TESTING_EXECUTOR: OnceLock = OnceLock::new(); + +impl DedicatedExecutor { + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the threadpool created via + /// `[tokio::main]` or similar. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + /// + /// If [`DedicatedExecutor::new`] is called from an existing tokio runtime, + /// it will assume that the existing runtime should be used for I/O, and is + /// thus set, via [`register_io_runtime`] by all threads spawned by the + /// executor. This will allow scheduling IO outside the context of + /// [`DedicatedExecutor`] using [`spawn_io`]. + pub fn new( + name: &str, + config: ExecutionConfig, + runtime_builder: tokio::runtime::Builder, + ) -> Self { + Self::new_inner(name, config, runtime_builder, false) + } + + fn new_inner( + name: &str, + config: ExecutionConfig, + runtime_builder: tokio::runtime::Builder, + testing: bool, + ) -> Self { + let name = name.to_owned(); + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = tokio::runtime::Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + register_io_runtime(io_handle.clone()); + + info!( + "Creating DedicatedExecutor with {} threads", + config.dedicated_executor_threads + ); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .worker_threads(config.dedicated_executor_threads) + .on_thread_start(move || register_io_runtime(io_handle.clone())) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + Self { + state: Arc::new(RwLock::new(state)), + testing, + } + } + + /// Create new executor for testing purposes. + /// + /// Internal state may be shared with other tests. + pub fn new_testing() -> Self { + TESTING_EXECUTOR + .get_or_init(|| { + let mut runtime_builder = tokio::runtime::Builder::new_current_thread(); + + // only enable `time` but NOT the IO integration since IO shouldn't run on the DataFusion runtime + // See: + // - https://github.com/influxdata/influxdb_iox/issues/10803 + // - https://github.com/influxdata/influxdb_iox/pull/11030 + runtime_builder.enable_time(); + + Self::new_inner("testing", ExecutionConfig::default(), runtime_builder, true) + }) + .clone() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # Notes + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// Currently all tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + if self.testing { + return; + } + + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + if self.testing { + return; + } + + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones +// share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +#[cfg(test)] +mod tests { + use crate::execution::executor::io::spawn_io; + + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutor::new( + "Test DedicatedExecutor", + ExecutionConfig::default(), + runtime_builder, + ) + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn(async move { + let dedicated_id = std::thread::current().id(); + let spawned = spawn_io(async move { std::thread::current().id() }).await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutor::new( + "Test DedicatedExecutor", + ExecutionConfig::default(), + runtime_builder, + ); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutor::new( + "Test DedicatedExecutor", + ExecutionConfig::default(), + runtime_builder, + ); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_testing_executor_prevents_io() { + let exec = DedicatedExecutor::new_testing(); + + let io_disabled = exec + .spawn(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } +} diff --git a/src/execution/executor/io.rs b/src/execution/executor/io.rs new file mode 100644 index 0000000..aa18971 --- /dev/null +++ b/src/execution/executor/io.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + cell::RefCell, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Future, FutureExt}; +use tokio::{runtime::Handle, task::JoinHandle}; + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +/// Registers `handle` as the IO runtime for this thread +/// +/// See [`spawn_io`] +pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) +} + +/// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, +/// otherwise awaits on the current thread +/// +/// # Panic +/// Needs a IO runtime [registered](register_io_runtime). +pub async fn spawn_io(fut: Fut) -> Fut::Output +where + Fut: Future + Send + 'static, + Fut::Output: Send, +{ + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExcutor::spawn` or for tests `register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + spawn_io(futures::future::ready(())).await; + } +} diff --git a/src/execution/executor/mod.rs b/src/execution/executor/mod.rs new file mode 100644 index 0000000..b0f6592 --- /dev/null +++ b/src/execution/executor/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// A majority of the code in this module was copied from InfluxDB +/// +/// Ref: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +pub mod dedicated; +pub mod io; diff --git a/src/execution/local.rs b/src/execution/local.rs index b6e642b..472b72a 100644 --- a/src/execution/local.rs +++ b/src/execution/local.rs @@ -21,6 +21,9 @@ use std::io::Write; use std::path::PathBuf; use std::sync::Arc; +use color_eyre::eyre::eyre; +use datafusion::logical_expr::LogicalPlan; +use futures::TryFutureExt; use log::{debug, error, info}; use crate::config::ExecutionConfig; @@ -32,6 +35,7 @@ use datafusion::prelude::*; use datafusion::sql::parser::{DFParser, Statement}; use tokio_stream::StreamExt; +use super::executor::dedicated::DedicatedExecutor; use super::local_benchmarks::LocalBenchmarkStats; use super::stats::{ExecutionDurationStats, ExecutionStats}; use super::AppType; @@ -52,8 +56,12 @@ use super::AppType; #[derive(Clone)] pub struct ExecutionContext { config: ExecutionConfig, + /// Underlying `SessionContext` session_ctx: SessionContext, + /// Path to the configured DDL file ddl_path: Option, + /// Dedicated executor for running CPU intensive work + executor: Option, } impl std::fmt::Debug for ExecutionContext { @@ -66,6 +74,7 @@ impl ExecutionContext { /// Construct a new `ExecutionContext` with the specified configuration pub fn try_new(config: &ExecutionConfig, app_type: AppType) -> Result { let mut builder = DftSessionStateBuilder::new(); + let mut executor = None; match app_type { AppType::Cli => { builder = builder.with_batch_size(config.cli_batch_size); @@ -75,6 +84,16 @@ impl ExecutionContext { } AppType::FlightSQLServer => { builder = builder.with_batch_size(config.flightsql_server_batch_size); + if config.dedicated_executor_enabled { + // Ideally we would only use `enable_time` but we are still doing + // some network requests as part of planning / execution which require network + // functionality. + + let runtime_builder = tokio::runtime::Builder::new_multi_thread(); + let dedicated_executor = + DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder); + executor = Some(dedicated_executor) + } } } let extensions = enabled_extensions(); @@ -101,6 +120,7 @@ impl ExecutionContext { config: config.clone(), session_ctx, ddl_path: config.ddl_path.as_ref().map(PathBuf::from), + executor, }) } @@ -117,6 +137,43 @@ impl ExecutionContext { &self.session_ctx } + /// Return the inner [`DedicatedExecutor`] + pub fn executor(&self) -> &Option { + &self.executor + } + + /// Convert the statement to a `LogicalPlan`. Uses the [`DedicatedExecutor`] if it is available. + pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result { + let ctx = self.session_ctx.clone(); + let task = async move { ctx.state().statement_to_plan(statement).await }; + if let Some(executor) = &self.executor { + let job = executor.spawn(task).map_err(|e| eyre::eyre!(e)); + let job_res = job.await?; + job_res.map_err(|e| eyre!(e)) + } else { + task.await.map_err(|e| eyre!(e)) + } + } + + /// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available. + pub async fn execute_logical_plan( + &self, + logical_plan: LogicalPlan, + ) -> Result { + let ctx = self.session_ctx.clone(); + let task = async move { + let df = ctx.execute_logical_plan(logical_plan).await?; + df.execute_stream().await + }; + if let Some(executor) = &self.executor { + let job = executor.spawn(task).map_err(|e| eyre!(e)); + let job_res = job.await?; + job_res.map_err(|e| eyre!(e)) + } else { + task.await.map_err(|e| eyre!(e)) + } + } + /// Executes the specified sql string, driving it to completion but discarding any results pub async fn execute_sql_and_discard_results( &self, diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 90b7c8d..c96b651 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod executor; #[cfg(feature = "flightsql")] pub mod flightsql; #[cfg(feature = "flightsql")] diff --git a/src/flightsql_server/mod.rs b/src/flightsql_server/mod.rs index ff8cb34..1717451 100644 --- a/src/flightsql_server/mod.rs +++ b/src/flightsql_server/mod.rs @@ -59,13 +59,8 @@ impl FlightSqlServiceImpl { // wrap up tonic goop FlightServiceServer::new(self.clone()) } -} - -#[tonic::async_trait] -impl FlightSqlService for FlightSqlServiceImpl { - type FlightService = FlightSqlServiceImpl; - async fn get_flight_info_statement( + async fn get_flight_info_statement_handler( &self, query: CommandStatementQuery, request: Request, @@ -78,13 +73,10 @@ impl FlightSqlService for FlightSqlServiceImpl { Ok(statements) => { let statement = statements[0].clone(); let start = std::time::Instant::now(); - match self - .execution - .session_ctx() - .state() - .statement_to_plan(statement) - .await - { + + let logical_plan = self.execution.statement_to_logical_plan(statement).await; + + match logical_plan { Ok(logical_plan) => { debug!("logical planning took: {:?}", start.elapsed()); let schema = logical_plan.schema(); @@ -118,26 +110,18 @@ impl FlightSqlService for FlightSqlServiceImpl { } Err(e) => { error!("Error planning SQL query: {:?}", e); - return Err(Status::internal("Error planning SQL query")); + Err(Status::internal("Error planning SQL query")) } } } Err(e) => { error!("Error parsing SQL query: {:?}", e); - return Err(Status::internal("Error parsing SQL query")); + Err(Status::internal("Error parsing SQL query")) } } } - async fn do_get_statement( - &self, - _ticket: TicketStatementQuery, - _request: Request, - ) -> Result::DoGetStream>, Status> { - Err(Status::unimplemented("Not implemented")) - } - - async fn do_get_fallback( + async fn do_get_fallback_handler( &self, request: Request, message: Any, @@ -162,35 +146,20 @@ impl FlightSqlService for FlightSqlServiceImpl { guard.get(&id).cloned() }; if let Some(plan) = maybe_plan { - match self + let stream = self .execution - .session_ctx() .execute_logical_plan(plan) .await - { - Ok(df) => { - let stream = df - .execute_stream() - .await - .map_err(|_| { - Status::internal("Error executing plan") - })? - .map_err(|e| { - FlightError::ExternalError(Box::new(e)) - }); - let builder = FlightDataEncoderBuilder::new(); - let flight_data_stream = builder.build(stream); - let b = flight_data_stream - .map_err(|_| Status::internal("Hi")) - .boxed(); - let res = Response::new(b); - Ok(res) - } - Err(e) => { - error!("error executing plan: {:?}", e); - Err(Status::internal("Error executing plan")) - } - } + .map_err(|e| Status::internal(e.to_string()))?; + let builder = FlightDataEncoderBuilder::new(); + let flight_data_stream = + builder + .build(stream.map_err(|e| { + FlightError::ExternalError(Box::new(e)) + })) + .map_err(|e| Status::internal(e.to_string())) + .boxed(); + Ok(Response::new(flight_data_stream)) } else { Err(Status::internal("Plan not found for id")) } @@ -213,6 +182,35 @@ impl FlightSqlService for FlightSqlServiceImpl { } } } +} + +#[tonic::async_trait] +impl FlightSqlService for FlightSqlServiceImpl { + type FlightService = FlightSqlServiceImpl; + + async fn get_flight_info_statement( + &self, + query: CommandStatementQuery, + request: Request, + ) -> Result, Status> { + self.get_flight_info_statement_handler(query, request).await + } + + async fn do_get_statement( + &self, + _ticket: TicketStatementQuery, + _request: Request, + ) -> Result::DoGetStream>, Status> { + Err(Status::unimplemented("Not implemented")) + } + + async fn do_get_fallback( + &self, + request: Request, + message: Any, + ) -> Result::DoGetStream>, Status> { + self.do_get_fallback_handler(request, message).await + } async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {} } @@ -270,16 +268,6 @@ impl FlightSqlApp { } } - // pub async fn channel(&self) -> Channel { - // let url = format!("http://{}", self.addr); - // let uri: Uri = url.parse().expect("Valid URI"); - // Channel::builder(uri) - // .timeout(Duration::from_secs(DEFAULT_TIMEOUT_SECONDS)) - // .connect() - // .await - // .expect("error connecting to server") - // } - /// Stops the server and waits for the server to shutdown pub async fn shutdown_and_wait(mut self) { if let Some(shutdown) = self.shutdown.take() { diff --git a/src/main.rs b/src/main.rs index 5d71f45..0d6bb38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,25 +22,41 @@ use dft::cli::CliApp; #[cfg(feature = "flightsql")] use dft::execution::flightsql::FlightSQLContext; use dft::execution::{local::ExecutionContext, AppExecution, AppType}; +#[cfg(feature = "experimental-flightsql-server")] +use dft::flightsql_server::{FlightSqlApp, FlightSqlServiceImpl}; use dft::telemetry; +use dft::tui::state::AppState; use dft::tui::{state, App}; -#[cfg(feature = "experimental-flightsql-server")] -use { - dft::flightsql_server::{FlightSqlApp, FlightSqlServiceImpl}, - log::info, -}; +use log::info; #[allow(unused_mut)] -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { let cli = DftArgs::parse(); + if !cli.files.is_empty() || !cli.commands.is_empty() || cli.serve { + env_logger::init(); + } + + let state = state::initialize(cli.config_path()); + + // With Runtimes configured correctly the main Tokio runtime should only be used for network + // IO, in which a single thread should be sufficient. + // + // Ref: https://github.com/datafusion-contrib/datafusion-dft/pull/247#discussion_r1848270250 + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build()?; + + let entry_point = app_entry_point(cli, state); + runtime.block_on(entry_point) +} + +async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> { // CLI mode: executing commands from files or CLI arguments if !cli.files.is_empty() || !cli.commands.is_empty() { - // use env_logger to setup logging for CLI - env_logger::init(); - let state = state::initialize(cli.config_path()); let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Cli)?; + #[allow(unused_mut)] let mut app_execution = AppExecution::new(execution_ctx); #[cfg(feature = "flightsql")] { @@ -54,11 +70,15 @@ async fn main() -> Result<()> { } let app = CliApp::new(app_execution, cli.clone()); app.execute_files_or_commands().await?; + // FlightSQL Server mode: start a FlightSQL server } else if cli.serve { + #[cfg(not(feature = "experimental-flightsql-server"))] + { + panic!("FlightSQL feature is not enabled"); + } #[cfg(feature = "experimental-flightsql-server")] { const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051"; - env_logger::init(); info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS); let state = state::initialize(cli.config_path()); let execution_ctx = @@ -76,13 +96,8 @@ async fn main() -> Result<()> { .await; app.run_app().await; } - - #[cfg(not(feature = "flightsql"))] - { - panic!("FlightSQL feature is not enabled"); - } } - // UI mode: running the TUI + // TUI mode: running the TUI else { // use alternate logging for TUI telemetry::initialize_logs()?; diff --git a/src/tui/mod.rs b/src/tui/mod.rs index f6681b8..0c17d3d 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -85,6 +85,7 @@ pub enum AppEvent { FlightSQLConnected, } +#[allow(dead_code)] pub struct App<'app> { state: state::AppState<'app>, execution: Arc,