Skip to content

Commit

Permalink
Make easier to create schedulers and executors
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Nov 18, 2024
1 parent 8928a70 commit 982a54c
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 165 deletions.
58 changes: 3 additions & 55 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,12 @@
//! Ballista Rust executor binary.
use anyhow::Result;
use std::sync::Arc;

use ballista_core::print_version;
use ballista_executor::config::prelude::*;
use ballista_executor::executor_process::{
start_executor_process, ExecutorProcessConfig,
};
use config::prelude::*;

#[allow(unused_imports)]
#[macro_use]
extern crate configure_me;

#[allow(clippy::all, warnings)]
mod config {
// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
}
use std::sync::Arc;

#[cfg(feature = "mimalloc")]
#[global_allocator]
Expand All @@ -53,46 +41,6 @@ async fn main() -> Result<()> {
std::process::exit(0);
}

let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);

let config = ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
bind_host: opt.bind_host,
port: opt.bind_port,
grpc_port: opt.bind_grpc_port,
scheduler_host: opt.scheduler_host,
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
cache_capacity: opt.cache_capacity,
cache_io_concurrency: opt.cache_io_concurrency,
execution_engine: None,
function_registry: None,
config_producer: None,
runtime_producer: None,
logical_codec: None,
physical_codec: None,
};

let config: ExecutorProcessConfig = opt.try_into()?;
start_executor_process(Arc::new(config)).await
}
71 changes: 71 additions & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista_core::error::BallistaError;

use crate::executor_process::ExecutorProcessConfig;

// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));

impl TryFrom<Config> for ExecutorProcessConfig {
type Error = BallistaError;

fn try_from(opt: Config) -> Result<Self, Self::Error> {
let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);

Ok(ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
bind_host: opt.bind_host,
port: opt.bind_port,
grpc_port: opt.bind_grpc_port,
scheduler_host: opt.scheduler_host,
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
cache_capacity: opt.cache_capacity,
cache_io_concurrency: opt.cache_io_concurrency,
override_execution_engine: None,
override_function_registry: None,
override_config_producer: None,
override_runtime_producer: None,
override_logical_codec: None,
override_physical_codec: None,
})
}
}
22 changes: 11 additions & 11 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ pub struct ExecutorProcessConfig {
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
/// Overrides default function registry
pub function_registry: Option<Arc<BallistaFunctionRegistry>>,
pub override_function_registry: Option<Arc<BallistaFunctionRegistry>>,
/// [RuntimeProducer] override option
pub runtime_producer: Option<RuntimeProducer>,
pub override_runtime_producer: Option<RuntimeProducer>,
/// [ConfigProducer] override option
pub config_producer: Option<ConfigProducer>,
pub override_config_producer: Option<ConfigProducer>,
/// [PhysicalExtensionCodec] override option
pub logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
/// [PhysicalExtensionCodec] override option
pub physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
}

pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
Expand Down Expand Up @@ -194,7 +194,7 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
// put them to session config
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
let config_producer = opt
.config_producer
.override_config_producer
.clone()
.unwrap_or_else(|| Arc::new(default_config_producer));

Expand All @@ -205,12 +205,12 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
});

let logical = opt
.logical_codec
.override_logical_codec
.clone()
.unwrap_or_else(|| Arc::new(BallistaLogicalExtensionCodec::default()));

let physical = opt
.physical_codec
.override_physical_codec
.clone()
.unwrap_or_else(|| Arc::new(BallistaPhysicalExtensionCodec::default()));

Expand All @@ -224,10 +224,10 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
&work_dir,
runtime_producer,
config_producer,
opt.function_registry.clone().unwrap_or_default(),
opt.override_function_registry.clone().unwrap_or_default(),
metrics_collector,
concurrent_tasks,
opt.execution_engine.clone(),
opt.override_execution_engine.clone(),
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![doc = include_str!("../README.md")]

pub mod collect;
pub mod config;
pub mod execution_engine;
pub mod execution_loop;
pub mod executor;
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ doc = "Delayed interval for cleaning up finished job state. Default: 3600"

[[param]]
name = "task_distribution"
type = "ballista_scheduler::config::TaskDistribution"
type = "crate::config::TaskDistribution"
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin, consistent-hash. Default: bias"
default = "ballista_scheduler::config::TaskDistribution::Bias"
default = "crate::config::TaskDistribution::Bias"

[[param]]
name = "consistent_hash_num_replicas"
Expand Down
111 changes: 22 additions & 89 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,16 @@

//! Ballista Rust scheduler binary.
use std::sync::Arc;
use std::{env, io};

use anyhow::Result;

use crate::config::{Config, ResultExt};
use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{
ClusterStorageConfig, SchedulerConfig, TaskDistribution, TaskDistributionPolicy,
};
use ballista_scheduler::config::{Config, ResultExt};
use ballista_scheduler::scheduler_process::start_server;
use std::sync::Arc;
use std::{env, io};
use tracing_subscriber::EnvFilter;

#[allow(unused_imports)]
#[macro_use]
extern crate configure_me;

#[allow(clippy::all, warnings)]
mod config {
// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(
env!("OUT_DIR"),
"/scheduler_configure_me_config.rs"
));
}

fn main() -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_io()
Expand All @@ -67,19 +48,23 @@ async fn inner() -> Result<()> {
std::process::exit(0);
}

let special_mod_log_level = opt.log_level_setting;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.log_level_setting.clone()));

let log_file_name_prefix = format!(
"scheduler_{}_{}_{}",
opt.namespace, opt.external_host, opt.bind_port
);
let tracing = tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter);

let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level));
// File layer
if let Some(log_dir) = log_dir {
if let Some(log_dir) = &opt.log_dir {
let log_file_name_prefix = format!(
"scheduler_{}_{}_{}",
opt.namespace, opt.external_host, opt.bind_port
);

let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &log_file_name_prefix)
Expand All @@ -94,68 +79,16 @@ async fn inner() -> Result<()> {
tracing_appender::rolling::never(log_dir, &log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();

tracing.with_writer(log_file).init();
} else {
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
tracing.init();
}

let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr.parse()?;

let cluster_storage_config = ClusterStorageConfig::Memory;

let task_distribution = match opt.task_distribution {
TaskDistribution::Bias => TaskDistributionPolicy::Bias,
TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin,
TaskDistribution::ConsistentHash => {
let num_replicas = opt.consistent_hash_num_replicas as usize;
let tolerance = opt.consistent_hash_tolerance as usize;
TaskDistributionPolicy::ConsistentHash {
num_replicas,
tolerance,
}
}
};

let config = SchedulerConfig {
namespace: opt.namespace,
external_host: opt.external_host,
bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
task_distribution,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
cluster_storage: cluster_storage_config,
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
executor_termination_grace_period: opt.executor_termination_grace_period,
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_timeout_seconds: opt.executor_timeout_seconds,
expire_dead_executor_interval_seconds: opt.expire_dead_executor_interval_seconds,
};

let config = opt.try_into()?;
let cluster = BallistaCluster::new_from_config(&config).await?;

start_server(cluster, addr, Arc::new(config)).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ pub struct InMemoryJobState {
session_builder: SessionBuilder,
/// Sender of job events
job_event_sender: ClusterEventSender<JobStateEvent>,

/// Config producer
config_producer: ConfigProducer,
}

Expand Down
Loading

0 comments on commit 982a54c

Please sign in to comment.