Skip to content

Commit

Permalink
fix: rest api /api/executors does not show executors if `TaskSchedu…
Browse files Browse the repository at this point in the history
…lingPolicy::PullStaged` (apache#1175)

* update comments and logs

* get list of connected executors

* add follow up comment
  • Loading branch information
milenkovicm authored Feb 10, 2025
1 parent 7121a17 commit e71847c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
5 changes: 5 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,16 @@ impl datafusion::config::ConfigExtension for BallistaConfig {

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me

/// Ballista supports both push-based and pull-based task scheduling.
/// It is recommended that you try both to determine which is the best for your use case.
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskSchedulingPolicy {
/// Pull-based scheduling works in a similar way to Apache Spark
#[default]
PullStaged,
/// push-based scheduling can result in lower latency.
PushStaged,
}

Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ExecutorMetaResponse {
pub id: String,
pub host: String,
pub port: u16,
pub last_seen: u128,
pub last_seen: Option<u128>,
}

#[derive(Debug, serde::Serialize)]
Expand Down Expand Up @@ -98,7 +98,7 @@ pub async fn get_executors<
id: metadata.id,
host: metadata.host,
port: metadata.port,
last_seen: duration.as_millis(),
last_seen: duration.map(|d| d.as_millis()),
})
.collect();

Expand Down
14 changes: 13 additions & 1 deletion ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl ClusterState for InMemoryClusterState {
spec: ExecutorData,
) -> Result<()> {
let executor_id = metadata.id.clone();
log::debug!("registering executor: {}", executor_id);

self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
Expand All @@ -223,6 +224,11 @@ impl ClusterState for InMemoryClusterState {
}

async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
log::debug!("save executor metadata: {}", metadata.id);
// TODO: MM it would make sense to add time when ExecutorMetadata is persisted
// we can do that adding additional field in ExecutorMetadata representing
// insert time. This information may be useful when reporting executor
// status and heartbeat is not available (in case of `TaskSchedulingPolicy::PullStaged`)
self.executors.insert(metadata.id.clone(), metadata);
Ok(())
}
Expand All @@ -239,6 +245,7 @@ impl ClusterState for InMemoryClusterState {
}

async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> {
log::debug!("saving executor heartbeat: {}", heartbeat.executor_id);
let executor_id = heartbeat.executor_id.clone();
if let Some(mut last) = self.heartbeats.get_mut(&executor_id) {
let _ = std::mem::replace(last.deref_mut(), heartbeat);
Expand All @@ -250,12 +257,13 @@ impl ClusterState for InMemoryClusterState {
}

async fn remove_executor(&self, executor_id: &str) -> Result<()> {
log::debug!("removing executor: {}", executor_id);
{
let mut guard = self.task_slots.lock().await;

guard.remove(executor_id);
}

self.executors.remove(executor_id);
self.heartbeats.remove(executor_id);

Ok(())
Expand All @@ -271,6 +279,10 @@ impl ClusterState for InMemoryClusterState {
fn get_executor_heartbeat(&self, executor_id: &str) -> Option<ExecutorHeartbeat> {
self.heartbeats.get(executor_id).map(|r| r.value().clone())
}

async fn registered_executor_metadata(&self) -> Vec<ExecutorMetadata> {
self.executors.iter().map(|v| v.clone()).collect()
}
}

/// Implementation of `JobState` which keeps all state in memory. If using `InMemoryJobState`
Expand Down
3 changes: 3 additions & 0 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ pub trait ClusterState: Send + Sync + 'static {
/// Get executor metadata for the provided executor ID. Returns an error if the executor does not exist
async fn get_executor_metadata(&self, executor_id: &str) -> Result<ExecutorMetadata>;

/// return list of registered executors
async fn registered_executor_metadata(&self) -> Vec<ExecutorMetadata>;

/// Save the executor heartbeat
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()>;

Expand Down
30 changes: 15 additions & 15 deletions ballista/scheduler/src/state/executor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,16 @@ impl ExecutorManager {
}

/// Get a list of all executors along with the timestamp of their last recorded heartbeat
pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, Duration)>> {
let heartbeat_timestamps: Vec<(String, u64)> = self
.cluster_state
.executor_heartbeats()
.into_iter()
.map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp))
.collect();

let mut state: Vec<(ExecutorMetadata, Duration)> = vec![];
for (executor_id, ts) in heartbeat_timestamps {
let duration = Duration::from_secs(ts);

let metadata = self.get_executor_metadata(&executor_id).await?;

pub async fn get_executor_state(
&self,
) -> Result<Vec<(ExecutorMetadata, Option<Duration>)>> {
let mut state: Vec<(ExecutorMetadata, Option<Duration>)> = vec![];
for metadata in self.cluster_state.registered_executor_metadata().await {
let duration = self
.cluster_state
.get_executor_heartbeat(&metadata.id)
.map(|hb| hb.timestamp)
.map(Duration::from_secs);
state.push((metadata, duration));
}

Expand All @@ -224,6 +220,10 @@ impl ExecutorManager {
///
/// For push-based one, we should use [`register_executor`], instead.
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
debug!(
"save executor metadata {} with {} task slots (pull-based registration)",
metadata.id, metadata.specification.task_slots
);
self.cluster_state.save_executor_metadata(metadata).await
}

Expand All @@ -238,7 +238,7 @@ impl ExecutorManager {
specification: ExecutorData,
) -> Result<()> {
debug!(
"registering executor {} with {} task slots",
"registering executor {} with {} task slots (push-based registration)",
metadata.id, specification.total_task_slots
);

Expand Down

0 comments on commit e71847c

Please sign in to comment.