Skip to content

Commit

Permalink
cleanup of unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Oct 8, 2024
1 parent f2f5139 commit cc55159
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 567 deletions.
5 changes: 1 addition & 4 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{
ClusterStorageConfig, SchedulerConfig, TaskDistribution, TaskDistributionPolicy,
SchedulerConfig, TaskDistribution, TaskDistributionPolicy,
};
use ballista_scheduler::scheduler_process::start_server;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -115,8 +115,6 @@ async fn inner() -> Result<()> {
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr.parse()?;

let cluster_storage_config = ClusterStorageConfig::Memory;

let task_distribution = match opt.task_distribution {
TaskDistribution::Bias => TaskDistributionPolicy::Bias,
TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin,
Expand All @@ -142,7 +140,6 @@ async fn inner() -> Result<()> {
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
cluster_storage: cluster_storage_config,
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
executor_termination_grace_period: opt.executor_termination_grace_period,
Expand Down
12 changes: 5 additions & 7 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use log::{debug, info, warn};

use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState};

use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy};
use crate::config::{SchedulerConfig, TaskDistributionPolicy};
use crate::scheduler_server::SessionBuilder;
use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription};
use crate::state::task_manager::JobInfoCache;
Expand Down Expand Up @@ -105,12 +105,10 @@ impl BallistaCluster {
pub async fn new_from_config(config: &SchedulerConfig) -> Result<Self> {
let scheduler = config.scheduler_name();

match &config.cluster_storage {
ClusterStorageConfig::Memory => Ok(BallistaCluster::new_memory(
scheduler,
default_session_builder,
)),
}
Ok(BallistaCluster::new_memory(
scheduler,
default_session_builder,
))
}

pub fn cluster_state(&self) -> Arc<dyn ClusterState> {
Expand Down
14 changes: 1 addition & 13 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ pub struct SchedulerConfig {
/// If provided, submitted jobs which do not have tasks scheduled will be resubmitted after `job_resubmit_interval_ms`
/// milliseconds
pub job_resubmit_interval_ms: Option<u64>,
/// Configuration for ballista cluster storage
pub cluster_storage: ClusterStorageConfig,

/// Time in seconds to allow executor for graceful shutdown. Once an executor signals it has entered Terminating status
/// the scheduler should only consider the executor dead after this time interval has elapsed
pub executor_termination_grace_period: u64,
Expand Down Expand Up @@ -76,7 +75,6 @@ impl Default for SchedulerConfig {
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: None,
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
Expand Down Expand Up @@ -151,11 +149,6 @@ impl SchedulerConfig {
self
}

pub fn with_cluster_storage(mut self, config: ClusterStorageConfig) -> Self {
self.cluster_storage = config;
self
}

pub fn with_job_resubmit_interval_ms(mut self, interval_ms: u64) -> Self {
self.job_resubmit_interval_ms = Some(interval_ms);
self
Expand All @@ -177,11 +170,6 @@ impl SchedulerConfig {
}
}

#[derive(Clone, Debug)]
pub enum ClusterStorageConfig {
Memory,
}

/// Policy of distributing tasks to available executor slots
///
/// It needs to be visible to code generated by configure_me
Expand Down
171 changes: 4 additions & 167 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,28 @@ use std::time::{SystemTime, UNIX_EPOCH};

use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{error, info, warn};

use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use ballista_core::serde::protobuf::failed_task::FailedReason;
use ballista_core::serde::protobuf::job_status::Status;
use ballista_core::serde::protobuf::{
self, execution_graph_stage::StageType, FailedTask, JobStatus, ResultLost,
RunningJob, SuccessfulJob, TaskStatus,
};
use ballista_core::serde::protobuf::{job_status, FailedJob, ShuffleWritePartition};
use ballista_core::serde::protobuf::{task_status, RunningTask};
use ballista_core::serde::protobuf::{
FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus,
};
use ballista_core::serde::scheduler::{
ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats,
};
use ballista_core::serde::BallistaCodec;
use datafusion_proto::physical_plan::AsExecutionPlan;

use crate::display::print_stage_metrics;
use crate::planner::DistributedPlanner;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::timestamp_millis;
use crate::state::execution_graph::execution_stage::RunningStage;
pub(crate) use crate::state::execution_graph::execution_stage::{
ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, TaskInfo,
UnresolvedStage,
ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage,
};
use crate::state::task_manager::UpdatedStages;

Expand Down Expand Up @@ -1317,163 +1311,6 @@ impl ExecutionGraph {
fn clear_stage_failure(&mut self, stage_id: usize) {
self.failed_stage_attempts.remove(&stage_id);
}

pub(crate) async fn decode_execution_graph<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
>(
proto: protobuf::ExecutionGraph,
codec: &BallistaCodec<T, U>,
session_ctx: &SessionContext,
) -> Result<ExecutionGraph> {
let mut stages: HashMap<usize, ExecutionStage> = HashMap::new();
for graph_stage in proto.stages {
let stage_type = graph_stage.stage_type.expect("Unexpected empty stage");

let execution_stage = match stage_type {
StageType::UnresolvedStage(stage) => {
let stage: UnresolvedStage =
UnresolvedStage::decode(stage, codec, session_ctx)?;
(stage.stage_id, ExecutionStage::UnResolved(stage))
}
StageType::ResolvedStage(stage) => {
let stage: ResolvedStage =
ResolvedStage::decode(stage, codec, session_ctx)?;
(stage.stage_id, ExecutionStage::Resolved(stage))
}
StageType::SuccessfulStage(stage) => {
let stage: SuccessfulStage =
SuccessfulStage::decode(stage, codec, session_ctx)?;
(stage.stage_id, ExecutionStage::Successful(stage))
}
StageType::FailedStage(stage) => {
let stage: FailedStage =
FailedStage::decode(stage, codec, session_ctx)?;
(stage.stage_id, ExecutionStage::Failed(stage))
}
};

stages.insert(execution_stage.0, execution_stage.1);
}

let output_locations: Vec<PartitionLocation> = proto
.output_locations
.into_iter()
.map(|loc| loc.try_into())
.collect::<Result<Vec<_>>>()?;

let failed_stage_attempts = proto
.failed_attempts
.into_iter()
.map(|attempt| {
(
attempt.stage_id as usize,
HashSet::from_iter(
attempt
.stage_attempt_num
.into_iter()
.map(|num| num as usize),
),
)
})
.collect();

Ok(ExecutionGraph {
scheduler_id: (!proto.scheduler_id.is_empty()).then_some(proto.scheduler_id),
job_id: proto.job_id,
job_name: proto.job_name,
session_id: proto.session_id,
status: proto.status.ok_or_else(|| {
BallistaError::Internal(
"Invalid Execution Graph: missing job status".to_owned(),
)
})?,
queued_at: proto.queued_at,
start_time: proto.start_time,
end_time: proto.end_time,
stages,
output_partitions: proto.output_partitions as usize,
output_locations,
task_id_gen: proto.task_id_gen as usize,
failed_stage_attempts,
})
}

/// Running stages will not be persisted so that will not be encoded.
/// Running stages will be convert back to the resolved stages to be encoded and persisted
pub(crate) fn encode_execution_graph<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
>(
graph: ExecutionGraph,
codec: &BallistaCodec<T, U>,
) -> Result<protobuf::ExecutionGraph> {
let job_id = graph.job_id().to_owned();

let stages = graph
.stages
.into_values()
.map(|stage| {
let stage_type = match stage {
ExecutionStage::UnResolved(stage) => {
StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?)
}
ExecutionStage::Resolved(stage) => {
StageType::ResolvedStage(ResolvedStage::encode(stage, codec)?)
}
ExecutionStage::Running(stage) => StageType::ResolvedStage(
ResolvedStage::encode(stage.to_resolved(), codec)?,
),
ExecutionStage::Successful(stage) => StageType::SuccessfulStage(
SuccessfulStage::encode(job_id.clone(), stage, codec)?,
),
ExecutionStage::Failed(stage) => StageType::FailedStage(
FailedStage::encode(job_id.clone(), stage, codec)?,
),
};
Ok(protobuf::ExecutionGraphStage {
stage_type: Some(stage_type),
})
})
.collect::<Result<Vec<_>>>()?;

let output_locations: Vec<protobuf::PartitionLocation> = graph
.output_locations
.into_iter()
.map(|loc| loc.try_into())
.collect::<Result<Vec<_>>>()?;

let failed_attempts: Vec<protobuf::StageAttempts> = graph
.failed_stage_attempts
.into_iter()
.map(|(stage_id, attempts)| {
let stage_attempt_num = attempts
.into_iter()
.map(|num| num as u32)
.collect::<Vec<_>>();
protobuf::StageAttempts {
stage_id: stage_id as u32,
stage_attempt_num,
}
})
.collect::<Vec<_>>();

Ok(protobuf::ExecutionGraph {
job_id: graph.job_id,
job_name: graph.job_name,
session_id: graph.session_id,
status: Some(graph.status),
queued_at: graph.queued_at,
start_time: graph.start_time,
end_time: graph.end_time,
stages,
output_partitions: graph.output_partitions as u64,
output_locations,
scheduler_id: graph.scheduler_id.unwrap_or_default(),
task_id_gen: graph.task_id_gen as u32,
failed_attempts,
})
}
}

impl Debug for ExecutionGraph {
Expand Down
Loading

0 comments on commit cc55159

Please sign in to comment.