Skip to content

Commit

Permalink
hide deps needed for binary behind optional feature
Browse files Browse the repository at this point in the history
as users can create their own scheduler/executor
binaries we should make optional dependencies which
are needed to for scheduler/executor binaries
  • Loading branch information
milenkovicm committed Dec 9, 2024
1 parent 466b0fe commit c9a2820
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 35 deletions.
10 changes: 6 additions & 4 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ executor = "executor_config_spec.toml"
[[bin]]
name = "ballista-executor"
path = "src/bin/main.rs"
required-features = ["build-binary"]

[features]
default = ["mimalloc"]
build-binary = ["configure_me", "tracing-subscriber", "tracing-appender", "tracing"]

[dependencies]
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "0.12.0" }
configure_me = { workspace = true }
configure_me = { workspace = true, optional = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
Expand All @@ -59,9 +61,9 @@ tokio = { workspace = true, features = [
] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true }
tracing = { workspace = true, optional = true }
tracing-appender = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
uuid = { workspace = true }

[dev-dependencies]
Expand Down
8 changes: 5 additions & 3 deletions ballista/executor/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

extern crate configure_me_codegen;

fn main() -> Result<(), String> {
#[cfg(feature = "build-binary")]
println!("cargo:rerun-if-changed=executor_config_spec.toml");
#[cfg(feature = "build-binary")]
configure_me_codegen::build_script_auto()
.map_err(|e| format!("configure_me code generation failed: {e}"))
.map_err(|e| format!("configure_me code generation failed: {e}"))?;

Ok(())
}
1 change: 1 addition & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::executor_process::ExecutorProcessConfig;

// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code

include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));

impl TryFrom<Config> for ExecutorProcessConfig {
Expand Down
6 changes: 4 additions & 2 deletions ballista/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use tokio::{sync::mpsc::Sender, task};
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status, Streaming};
use tracing::warn;

/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
Expand Down Expand Up @@ -103,7 +102,10 @@ impl FlightService for BallistaFlightService {
let schema = reader.schema();
task::spawn_blocking(move || {
if let Err(e) = read_partition(reader, tx) {
warn!(error = %e, "error streaming shuffle partition");
log::warn!(
"error streaming shuffle partition: {}",
e.to_string()
);
}
});

Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![doc = include_str!("../README.md")]

pub mod collect;
#[cfg(feature = "build-binary")]
pub mod config;
pub mod execution_engine;
pub mod execution_loop;
Expand Down
10 changes: 6 additions & 4 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ scheduler = "scheduler_config_spec.toml"
[[bin]]
name = "ballista-scheduler"
path = "src/bin/main.rs"
required-features = ["build-binary"]

[features]
default = []
flight-sql = []
keda-scaler = []
prometheus-metrics = ["prometheus", "once_cell"]
rest-api = []
build-binary = ["configure_me", "tracing-subscriber", "tracing-appender", "tracing"]

[dependencies]
arrow-flight = { workspace = true }
Expand All @@ -47,7 +49,7 @@ axum = "0.7.7"
ballista-core = { path = "../core", version = "0.12.0" }
base64 = { version = "0.22" }
clap = { workspace = true }
configure_me = { workspace = true }
configure_me = { workspace = true, optional = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
Expand All @@ -66,9 +68,9 @@ serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true }
tracing = { workspace = true, optional = true }
tracing-appender = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
uuid = { workspace = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

extern crate configure_me_codegen;

fn main() -> Result<(), String> {
#[cfg(feature = "build-binary")]
println!("cargo:rerun-if-changed=scheduler_config_spec.toml");
#[cfg(feature = "build-binary")]
configure_me_codegen::build_script_auto()
.map_err(|e| format!("configure_me code generation failed: {e}"))?;

Expand Down
3 changes: 1 addition & 2 deletions ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ use crate::scheduler_server::{timestamp_millis, timestamp_secs, SessionBuilder};
use crate::state::session_manager::create_datafusion_context;
use crate::state::task_manager::JobInfoCache;
use ballista_core::serde::protobuf::job_status::Status;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;

use ballista_core::consistent_hash::node::Node;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
use tracing::debug;

#[derive(Default)]
pub struct InMemoryClusterState {
Expand Down
5 changes: 2 additions & 3 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::collections::{HashMap, HashSet};
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -69,9 +68,9 @@ impl std::str::FromStr for ClusterStorage {
ValueEnum::from_str(s, true)
}
}

#[cfg(feature = "build-binary")]
impl configure_me::parse_arg::ParseArgFromStr for ClusterStorage {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
write!(writer, "The cluster storage backend for the scheduler")
}
}
Expand Down
12 changes: 7 additions & 5 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
//! Ballista scheduler specific configuration
use crate::SessionBuilder;
use ballista_core::{config::TaskSchedulingPolicy, error::BallistaError, ConfigProducer};
use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
use clap::ValueEnum;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use std::{fmt, sync::Arc};
use std::sync::Arc;

#[cfg(feature = "build-binary")]
include!(concat!(
env!("OUT_DIR"),
"/scheduler_configure_me_config.rs"
Expand Down Expand Up @@ -233,8 +234,9 @@ impl std::str::FromStr for TaskDistribution {
}
}

#[cfg(feature = "build-binary")]
impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
fn describe_type<W: std::fmt::Write>(mut writer: W) -> std::fmt::Result {
write!(writer, "The executor slots policy for the scheduler")
}
}
Expand All @@ -257,9 +259,9 @@ pub enum TaskDistributionPolicy {
tolerance: usize,
},
}

#[cfg(feature = "build-binary")]
impl TryFrom<Config> for SchedulerConfig {
type Error = BallistaError;
type Error = ballista_core::error::BallistaError;

fn try_from(opt: Config) -> Result<Self, Self::Error> {
let task_distribution = match opt.task_distribution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,9 @@ mod tests {
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[tokio::test]
async fn test_pending_job_metric() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let plan = test_plan(10);

let metrics_collector = Arc::new(TestMetricsCollector::default());
Expand Down
4 changes: 1 addition & 3 deletions ballista/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use dashmap::DashMap;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
Expand All @@ -48,8 +48,6 @@ use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;

use tracing::trace;

type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;

// TODO move to configuration file
Expand Down
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
ballista-executor = { path = "../ballista/executor", version = "0.12.0", features = ["build-binary"] }
ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0", features = ["build-binary"] }
datafusion = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
Expand Down

0 comments on commit c9a2820

Please sign in to comment.