From 74ca8ddb66624782ed552d03ff61f5186d850e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 26 Nov 2024 21:36:38 +0000 Subject: [PATCH] feat: Add option to limit number of executor worker threads this option is similar to `spark.executor.cores` it sets number of tokio worker threads. --- ballista/executor/Cargo.toml | 8 +------- ballista/executor/executor_config_spec.toml | 7 ++++++- ballista/executor/src/bin/main.rs | 19 ++++++++++++++++--- ballista/scheduler/src/bin/main.rs | 1 + 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index 0f5a62d405..12d9156358 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -51,13 +51,7 @@ log = { workspace = true } mimalloc = { workspace = true, default-features = false, optional = true } parking_lot = { workspace = true } tempfile = { workspace = true } -tokio = { workspace = true, features = [ - "macros", - "rt", - "rt-multi-thread", - "parking_lot", - "signal", -] } +tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true, features = ["net"] } tonic = { workspace = true } tracing = { workspace = true } diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index 209069de1c..389d7f44b9 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -164,4 +164,9 @@ default = "1073741824" name = "cache_io_concurrency" type = "u32" doc = "The number of worker threads for the runtime of caching. Default: 2" -default = "2" \ No newline at end of file +default = "2" + +[[param]] +name = "executor_cores" +type = "usize" +doc = "The number of worker threads. Default: number of available cores" \ No newline at end of file diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 5ef88e8bf2..96383cf8f9 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -32,13 +32,26 @@ use tracing_subscriber::EnvFilter; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -#[tokio::main] -async fn main() -> Result<()> { - // parse command-line arguments +fn main() -> Result<()> { let (opt, _remaining_args) = Config::including_optional_config_files(&["/etc/ballista/executor.toml"]) .unwrap_or_exit(); + let executor_cores = opt + .executor_cores + .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("ballista_executor") + .worker_threads(executor_cores) + .build() + .unwrap(); + + runtime.block_on(inner(opt)) +} + +async fn inner(opt: Config) -> Result<()> { if opt.version { print_version(); std::process::exit(0); diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index f6a0632840..81415fbf8c 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -31,6 +31,7 @@ fn main() -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_io() .enable_time() + .thread_name("ballista_scheduler") .thread_stack_size(32 * 1024 * 1024) // 32MB .build() .unwrap();