Skip to content

Commit

Permalink
feat: Add option to limit number of executor worker threads
Browse files Browse the repository at this point in the history
this option is similar to `spark.executor.cores`
it sets number of tokio worker threads.
  • Loading branch information
milenkovicm committed Nov 26, 2024
1 parent 7947b07 commit 74ca8dd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
8 changes: 1 addition & 7 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ log = { workspace = true }
mimalloc = { workspace = true, default-features = false, optional = true }
parking_lot = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = [
"macros",
"rt",
"rt-multi-thread",
"parking_lot",
"signal",
] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
tracing = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,9 @@ default = "1073741824"
name = "cache_io_concurrency"
type = "u32"
doc = "The number of worker threads for the runtime of caching. Default: 2"
default = "2"
default = "2"

[[param]]
name = "executor_cores"
type = "usize"
doc = "The number of worker threads. Default: number of available cores"
19 changes: 16 additions & 3 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,26 @@ use tracing_subscriber::EnvFilter;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[tokio::main]
async fn main() -> Result<()> {
// parse command-line arguments
fn main() -> Result<()> {
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
.unwrap_or_exit();

let executor_cores = opt
.executor_cores
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get());

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("ballista_executor")
.worker_threads(executor_cores)
.build()
.unwrap();

runtime.block_on(inner(opt))
}

async fn inner(opt: Config) -> Result<()> {
if opt.version {
print_version();
std::process::exit(0);
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ fn main() -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_name("ballista_scheduler")
.thread_stack_size(32 * 1024 * 1024) // 32MB
.build()
.unwrap();
Expand Down

0 comments on commit 74ca8dd

Please sign in to comment.