diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index cb7f7c5d7..628821447 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -250,11 +250,16 @@ impl datafusion::config::ConfigExtension for BallistaConfig { // an enum used to configure the scheduler policy // needs to be visible to code generated by configure_me + +/// Ballista supports both push-based and pull-based task scheduling. +/// It is recommended that you try both to determine which is the best for your use case. #[derive(Clone, Copy, Debug, serde::Deserialize, Default)] #[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum TaskSchedulingPolicy { + /// Pull-based scheduling works in a similar way to Apache Spark #[default] PullStaged, + /// push-based scheduling can result in lower latency. PushStaged, } diff --git a/ballista/scheduler/src/api/handlers.rs b/ballista/scheduler/src/api/handlers.rs index 4d0366ff8..fc9d5842d 100644 --- a/ballista/scheduler/src/api/handlers.rs +++ b/ballista/scheduler/src/api/handlers.rs @@ -41,7 +41,7 @@ pub struct ExecutorMetaResponse { pub id: String, pub host: String, pub port: u16, - pub last_seen: u128, + pub last_seen: Option, } #[derive(Debug, serde::Serialize)] @@ -98,7 +98,7 @@ pub async fn get_executors< id: metadata.id, host: metadata.host, port: metadata.port, - last_seen: duration.as_millis(), + last_seen: duration.map(|d| d.as_millis()), }) .collect(); diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index 759c52c82..918a0b682 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -197,6 +197,7 @@ impl ClusterState for InMemoryClusterState { spec: ExecutorData, ) -> Result<()> { let executor_id = metadata.id.clone(); + log::debug!("registering executor: {}", executor_id); self.save_executor_metadata(metadata).await?; self.save_executor_heartbeat(ExecutorHeartbeat { @@ -223,6 +224,11 @@ impl ClusterState for InMemoryClusterState { } async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> { + log::debug!("save executor metadata: {}", metadata.id); + // TODO: MM it would make sense to add time when ExecutorMetadata is persisted + // we can do that adding additional field in ExecutorMetadata representing + // insert time. This information may be useful when reporting executor + // status and heartbeat is not available (in case of `TaskSchedulingPolicy::PullStaged`) self.executors.insert(metadata.id.clone(), metadata); Ok(()) } @@ -239,6 +245,7 @@ impl ClusterState for InMemoryClusterState { } async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> { + log::debug!("saving executor heartbeat: {}", heartbeat.executor_id); let executor_id = heartbeat.executor_id.clone(); if let Some(mut last) = self.heartbeats.get_mut(&executor_id) { let _ = std::mem::replace(last.deref_mut(), heartbeat); @@ -250,12 +257,13 @@ impl ClusterState for InMemoryClusterState { } async fn remove_executor(&self, executor_id: &str) -> Result<()> { + log::debug!("removing executor: {}", executor_id); { let mut guard = self.task_slots.lock().await; guard.remove(executor_id); } - + self.executors.remove(executor_id); self.heartbeats.remove(executor_id); Ok(()) @@ -271,6 +279,10 @@ impl ClusterState for InMemoryClusterState { fn get_executor_heartbeat(&self, executor_id: &str) -> Option { self.heartbeats.get(executor_id).map(|r| r.value().clone()) } + + async fn registered_executor_metadata(&self) -> Vec { + self.executors.iter().map(|v| v.clone()).collect() + } } /// Implementation of `JobState` which keeps all state in memory. If using `InMemoryJobState` diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 5be42ee79..fd546f59b 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -186,6 +186,9 @@ pub trait ClusterState: Send + Sync + 'static { /// Get executor metadata for the provided executor ID. Returns an error if the executor does not exist async fn get_executor_metadata(&self, executor_id: &str) -> Result; + /// return list of registered executors + async fn registered_executor_metadata(&self) -> Vec; + /// Save the executor heartbeat async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()>; diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 702d38ead..0248a85ef 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -192,20 +192,16 @@ impl ExecutorManager { } /// Get a list of all executors along with the timestamp of their last recorded heartbeat - pub async fn get_executor_state(&self) -> Result> { - let heartbeat_timestamps: Vec<(String, u64)> = self - .cluster_state - .executor_heartbeats() - .into_iter() - .map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp)) - .collect(); - - let mut state: Vec<(ExecutorMetadata, Duration)> = vec![]; - for (executor_id, ts) in heartbeat_timestamps { - let duration = Duration::from_secs(ts); - - let metadata = self.get_executor_metadata(&executor_id).await?; - + pub async fn get_executor_state( + &self, + ) -> Result)>> { + let mut state: Vec<(ExecutorMetadata, Option)> = vec![]; + for metadata in self.cluster_state.registered_executor_metadata().await { + let duration = self + .cluster_state + .get_executor_heartbeat(&metadata.id) + .map(|hb| hb.timestamp) + .map(Duration::from_secs); state.push((metadata, duration)); } @@ -224,6 +220,10 @@ impl ExecutorManager { /// /// For push-based one, we should use [`register_executor`], instead. pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> { + debug!( + "save executor metadata {} with {} task slots (pull-based registration)", + metadata.id, metadata.specification.task_slots + ); self.cluster_state.save_executor_metadata(metadata).await } @@ -238,7 +238,7 @@ impl ExecutorManager { specification: ExecutorData, ) -> Result<()> { debug!( - "registering executor {} with {} task slots", + "registering executor {} with {} task slots (push-based registration)", metadata.id, specification.total_task_slots );