From 94359f7a9fb642f1c4fbb6df8cd943d4768280c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 26 Nov 2024 21:53:42 +0000 Subject: [PATCH] limit `concurrent_task` to `executor_cores` --- ballista/executor/executor_config_spec.toml | 2 +- ballista/executor/src/bin/main.rs | 6 +----- ballista/executor/src/config.rs | 13 ++++++++++++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index 389d7f44b9..92c7dfda6a 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -75,7 +75,6 @@ doc = "Directory for temporary IPC files" abbr = "c" name = "concurrent_tasks" type = "usize" -default = "0" # defaults to all available cores if left as zero doc = "Max concurrent tasks." [[param]] @@ -167,6 +166,7 @@ doc = "The number of worker threads for the runtime of caching. Default: 2" default = "2" [[param]] +abbr = "e" name = "executor_cores" type = "usize" doc = "The number of worker threads. Default: number of available cores" \ No newline at end of file diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 96383cf8f9..81b925f1e9 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -37,14 +37,10 @@ fn main() -> Result<()> { Config::including_optional_config_files(&["/etc/ballista/executor.toml"]) .unwrap_or_exit(); - let executor_cores = opt - .executor_cores - .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()); - let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .thread_name("ballista_executor") - .worker_threads(executor_cores) + .worker_threads(opt.executor_cores_or_default()) .build() .unwrap(); diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs index 78db477f9b..82d42d2a7c 100644 --- a/ballista/executor/src/config.rs +++ b/ballista/executor/src/config.rs @@ -23,6 +23,15 @@ use crate::executor_process::ExecutorProcessConfig; // #[allow(clippy::all)] to silence clippy warnings from the generated code include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs")); +impl Config { + /// returns executor cores if setup or number of available + /// cpu cores + pub fn executor_cores_or_default(&self) -> usize { + self.executor_cores + .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()) + } +} + impl TryFrom for ExecutorProcessConfig { type Error = BallistaError; @@ -35,6 +44,8 @@ impl TryFrom for ExecutorProcessConfig { opt.bind_port ); + let concurrent_tasks = opt.executor_cores_or_default(); + Ok(ExecutorProcessConfig { special_mod_log_level: opt.log_level_setting, external_host: opt.external_host, @@ -44,7 +55,7 @@ impl TryFrom for ExecutorProcessConfig { scheduler_host: opt.scheduler_host, scheduler_port: opt.scheduler_port, scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds, - concurrent_tasks: opt.concurrent_tasks, + concurrent_tasks, task_scheduling_policy: opt.task_scheduling_policy, work_dir: opt.work_dir, log_dir: opt.log_dir,