From 8b10bd1a46990812dba6ea79b834d2aa412a2990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sun, 24 Nov 2024 13:39:56 +0000 Subject: [PATCH] refactor: SessionStateExt and SessionConfigExt (#1130) * refactor: SessionStateExt and SessionConfigExt moving them out from util to their own module as util is too busy, also it will align with naming in used in client. also, split `SessionConfigExt` to `SessionConfigExt` and `SessionConfigHelperExt`. `SessionConfigExt` will stay user facing, `SessionConfigHelperExt` will be used internally. This reduces ballista client public api * remove deprecated context creation --- ballista/client/src/extension.rs | 5 +- ballista/core/src/extension.rs | 438 ++++++++++++++++++ ballista/core/src/lib.rs | 1 + .../core/src/serde/scheduler/from_proto.rs | 2 +- ballista/core/src/utils.rs | 435 +---------------- ballista/executor/src/execution_loop.rs | 2 +- ballista/executor/src/standalone.rs | 3 +- .../scheduler/src/scheduler_server/grpc.rs | 2 +- .../scheduler/src/scheduler_server/mod.rs | 2 +- ballista/scheduler/src/standalone.rs | 2 +- .../src/state/execution_graph_dot.rs | 2 +- ballista/scheduler/src/state/task_manager.rs | 2 +- ballista/scheduler/src/test_utils.rs | 5 +- 13 files changed, 460 insertions(+), 441 deletions(-) create mode 100644 ballista/core/src/extension.rs diff --git a/ballista/client/src/extension.rs b/ballista/client/src/extension.rs index 272f0ca96..89e1c7ce9 100644 --- a/ballista/client/src/extension.rs +++ b/ballista/client/src/extension.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -pub use ballista_core::utils::SessionConfigExt; +use ballista_core::extension::SessionConfigHelperExt; +pub use ballista_core::extension::{SessionConfigExt, SessionStateExt}; use ballista_core::{ serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams}, - utils::{create_grpc_client_connection, SessionStateExt}, + utils::create_grpc_client_connection, }; use datafusion::{ error::DataFusionError, diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs new file mode 100644 index 000000000..728255405 --- /dev/null +++ b/ballista/core/src/extension.rs @@ -0,0 +1,438 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::{ + BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, + BALLISTA_STANDALONE_PARALLELISM, +}; +use crate::serde::protobuf::KeyValuePair; +use crate::serde::{BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec}; +use crate::utils::BallistaQueryPlanner; +use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion_proto::logical_plan::LogicalExtensionCodec; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use datafusion_proto::protobuf::LogicalPlanNode; +use std::sync::Arc; + +/// Provides methods which adapt [SessionState] +/// for Ballista usage +pub trait SessionStateExt { + /// Setups new [SessionState] for ballista usage + /// + /// State will be created with appropriate [SessionConfig] configured + fn new_ballista_state( + scheduler_url: String, + session_id: String, + ) -> datafusion::error::Result; + /// Upgrades [SessionState] for ballista usage + /// + /// State will be upgraded to appropriate [SessionConfig] + fn upgrade_for_ballista( + self, + scheduler_url: String, + session_id: String, + ) -> datafusion::error::Result; +} + +/// [SessionConfig] extension with methods needed +/// for Ballista configuration + +pub trait SessionConfigExt { + /// Creates session config which has + /// ballista configuration initialized + fn new_with_ballista() -> SessionConfig; + + /// Overrides ballista's [LogicalExtensionCodec] + fn with_ballista_logical_extension_codec( + self, + codec: Arc, + ) -> SessionConfig; + + /// Overrides ballista's [PhysicalExtensionCodec] + fn with_ballista_physical_extension_codec( + self, + codec: Arc, + ) -> SessionConfig; + + /// returns [LogicalExtensionCodec] if set + /// or default ballista codec if not + fn ballista_logical_extension_codec(&self) -> Arc; + + /// returns [PhysicalExtensionCodec] if set + /// or default ballista codec if not + fn ballista_physical_extension_codec(&self) -> Arc; + + /// Overrides ballista's [QueryPlanner] + fn with_ballista_query_planner( + self, + planner: Arc, + ) -> SessionConfig; + + /// Returns ballista's [QueryPlanner] if overridden + fn ballista_query_planner( + &self, + ) -> Option>; + + /// Returns parallelism of standalone cluster + fn ballista_standalone_parallelism(&self) -> usize; + /// Sets parallelism of standalone cluster + /// + /// This option to be used to configure standalone session context + fn with_ballista_standalone_parallelism(self, parallelism: usize) -> Self; + + /// retrieves grpc client max message size + fn ballista_grpc_client_max_message_size(&self) -> usize; + + /// sets grpc client max message size + fn with_ballista_grpc_client_max_message_size(self, max_size: usize) -> Self; + + /// Sets ballista job name + fn with_ballista_job_name(self, job_name: &str) -> Self; +} + +/// [SessionConfigHelperExt] is set of [SessionConfig] extension methods +/// which are used internally (not exposed in client) +pub trait SessionConfigHelperExt { + /// converts [SessionConfig] to proto + fn to_key_value_pairs(&self) -> Vec; + /// updates [SessionConfig] from proto + fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self; + /// updates mut [SessionConfig] from proto + fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]); +} + +impl SessionStateExt for SessionState { + fn new_ballista_state( + scheduler_url: String, + session_id: String, + ) -> datafusion::error::Result { + let config = BallistaConfig::default(); + + let planner = + BallistaQueryPlanner::::new(scheduler_url, config.clone()); + + let session_config = SessionConfig::new() + .with_information_schema(true) + .with_option_extension(config.clone()) + // Ballista disables this option + .with_round_robin_repartition(false); + + let runtime_config = RuntimeConfig::default(); + let runtime_env = RuntimeEnv::new(runtime_config)?; + let session_state = SessionStateBuilder::new() + .with_default_features() + .with_config(session_config) + .with_runtime_env(Arc::new(runtime_env)) + .with_query_planner(Arc::new(planner)) + .with_session_id(session_id) + .build(); + + Ok(session_state) + } + + fn upgrade_for_ballista( + self, + scheduler_url: String, + session_id: String, + ) -> datafusion::error::Result { + let codec_logical = self.config().ballista_logical_extension_codec(); + let planner_override = self.config().ballista_query_planner(); + + let new_config = self + .config() + .options() + .extensions + .get::() + .cloned() + .unwrap_or_else(BallistaConfig::default); + + let session_config = self + .config() + .clone() + .with_option_extension(new_config.clone()) + // Ballista disables this option + .with_round_robin_repartition(false); + + let builder = SessionStateBuilder::new_from_existing(self) + .with_config(session_config) + .with_session_id(session_id); + + let builder = match planner_override { + Some(planner) => builder.with_query_planner(planner), + None => { + let planner = BallistaQueryPlanner::::with_extension( + scheduler_url, + new_config, + codec_logical, + ); + builder.with_query_planner(Arc::new(planner)) + } + }; + + Ok(builder.build()) + } +} + +impl SessionConfigExt for SessionConfig { + fn new_with_ballista() -> SessionConfig { + SessionConfig::new() + .with_option_extension(BallistaConfig::default()) + .with_target_partitions(16) + .with_round_robin_repartition(false) + } + fn with_ballista_logical_extension_codec( + self, + codec: Arc, + ) -> SessionConfig { + let extension = BallistaConfigExtensionLogicalCodec::new(codec); + self.with_extension(Arc::new(extension)) + } + fn with_ballista_physical_extension_codec( + self, + codec: Arc, + ) -> SessionConfig { + let extension = BallistaConfigExtensionPhysicalCodec::new(codec); + self.with_extension(Arc::new(extension)) + } + + fn ballista_logical_extension_codec(&self) -> Arc { + self.get_extension::() + .map(|c| c.codec()) + .unwrap_or_else(|| Arc::new(BallistaLogicalExtensionCodec::default())) + } + fn ballista_physical_extension_codec(&self) -> Arc { + self.get_extension::() + .map(|c| c.codec()) + .unwrap_or_else(|| Arc::new(BallistaPhysicalExtensionCodec::default())) + } + + fn with_ballista_query_planner( + self, + planner: Arc, + ) -> SessionConfig { + let extension = BallistaQueryPlannerExtension::new(planner); + self.with_extension(Arc::new(extension)) + } + + fn ballista_query_planner( + &self, + ) -> Option> { + self.get_extension::() + .map(|c| c.planner()) + } + + fn ballista_standalone_parallelism(&self) -> usize { + self.options() + .extensions + .get::() + .map(|c| c.default_standalone_parallelism()) + .unwrap_or_else(|| BallistaConfig::default().default_standalone_parallelism()) + } + + fn ballista_grpc_client_max_message_size(&self) -> usize { + self.options() + .extensions + .get::() + .map(|c| c.default_grpc_client_max_message_size()) + .unwrap_or_else(|| { + BallistaConfig::default().default_grpc_client_max_message_size() + }) + } + + fn with_ballista_job_name(self, job_name: &str) -> Self { + if self.options().extensions.get::().is_some() { + self.set_str(BALLISTA_JOB_NAME, job_name) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_str(BALLISTA_JOB_NAME, job_name) + } + } + + fn with_ballista_grpc_client_max_message_size(self, max_size: usize) -> Self { + if self.options().extensions.get::().is_some() { + self.set_usize(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, max_size) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_usize(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, max_size) + } + } + + fn with_ballista_standalone_parallelism(self, parallelism: usize) -> Self { + if self.options().extensions.get::().is_some() { + self.set_usize(BALLISTA_STANDALONE_PARALLELISM, parallelism) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_usize(BALLISTA_STANDALONE_PARALLELISM, parallelism) + } + } +} + +impl SessionConfigHelperExt for SessionConfig { + fn to_key_value_pairs(&self) -> Vec { + self.options() + .entries() + .iter() + .filter(|v| v.value.is_some()) + .map( + // TODO MM make `value` optional value + |datafusion::config::ConfigEntry { key, value, .. }| { + log::trace!( + "sending configuration key: `{}`, value`{:?}`", + key, + value + ); + KeyValuePair { + key: key.to_owned(), + value: value.clone().unwrap(), + } + }, + ) + .collect() + } + + fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self { + let mut s = self; + for KeyValuePair { key, value } in key_value_pairs { + log::trace!( + "setting up configuration key: `{}`, value: `{}`", + key, + value + ); + if let Err(e) = s.options_mut().set(key, value) { + log::warn!( + "could not set configuration key: `{}`, value: `{}`, reason: {}", + key, + value, + e.to_string() + ) + } + } + s + } + + fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]) { + for KeyValuePair { key, value } in key_value_pairs { + log::trace!( + "setting up configuration key : `{}`, value: `{}`", + key, + value + ); + if let Err(e) = self.options_mut().set(key, value) { + log::warn!( + "could not set configuration key: `{}`, value: `{}`, reason: {}", + key, + value, + e.to_string() + ) + } + } + } +} + +/// Wrapper for [SessionConfig] extension +/// holding [LogicalExtensionCodec] if overridden +struct BallistaConfigExtensionLogicalCodec { + codec: Arc, +} + +impl BallistaConfigExtensionLogicalCodec { + fn new(codec: Arc) -> Self { + Self { codec } + } + fn codec(&self) -> Arc { + self.codec.clone() + } +} + +/// Wrapper for [SessionConfig] extension +/// holding [PhysicalExtensionCodec] if overridden +struct BallistaConfigExtensionPhysicalCodec { + codec: Arc, +} + +impl BallistaConfigExtensionPhysicalCodec { + fn new(codec: Arc) -> Self { + Self { codec } + } + fn codec(&self) -> Arc { + self.codec.clone() + } +} + +/// Wrapper for [SessionConfig] extension +/// holding overridden [QueryPlanner] +struct BallistaQueryPlannerExtension { + planner: Arc, +} + +impl BallistaQueryPlannerExtension { + fn new(planner: Arc) -> Self { + Self { planner } + } + fn planner(&self) -> Arc { + self.planner.clone() + } +} + +#[cfg(test)] +mod test { + use datafusion::{ + execution::{SessionState, SessionStateBuilder}, + prelude::SessionConfig, + }; + + use crate::{ + config::BALLISTA_JOB_NAME, + extension::{SessionConfigExt, SessionConfigHelperExt, SessionStateExt}, + }; + + // Ballista disables round robin repatriations + #[tokio::test] + async fn should_disable_round_robin_repartition() { + let state = SessionState::new_ballista_state( + "scheduler_url".to_string(), + "session_id".to_string(), + ) + .unwrap(); + + assert!(!state.config().round_robin_repartition()); + + let state = SessionStateBuilder::new().build(); + + assert!(state.config().round_robin_repartition()); + let state = state + .upgrade_for_ballista("scheduler_url".to_string(), "session_id".to_string()) + .unwrap(); + + assert!(!state.config().round_robin_repartition()); + } + #[test] + fn should_convert_to_key_value_pairs() { + // key value pairs should contain datafusion and ballista values + + let config = + SessionConfig::new_with_ballista().with_ballista_job_name("job_name"); + let pairs = config.to_key_value_pairs(); + + assert!(pairs.iter().any(|p| p.key == BALLISTA_JOB_NAME)); + assert!(pairs + .iter() + .any(|p| p.key == "datafusion.catalog.information_schema")) + } +} diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index f415af70e..c2d92d353 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod consistent_hash; pub mod error; pub mod event_loop; pub mod execution_plans; +pub mod extension; pub mod registry; pub mod utils; diff --git a/ballista/core/src/serde/scheduler/from_proto.rs b/ballista/core/src/serde/scheduler/from_proto.rs index 372c1d910..0257cfa00 100644 --- a/ballista/core/src/serde/scheduler/from_proto.rs +++ b/ballista/core/src/serde/scheduler/from_proto.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::Duration; use crate::error::BallistaError; +use crate::extension::SessionConfigHelperExt; use crate::serde::scheduler::{ Action, BallistaFunctionRegistry, ExecutorData, ExecutorMetadata, ExecutorSpecification, PartitionId, PartitionLocation, PartitionStats, @@ -39,7 +40,6 @@ use crate::serde::scheduler::{ }; use crate::serde::{protobuf, BallistaCodec}; -use crate::utils::SessionConfigExt; use crate::RuntimeProducer; use protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime}; diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 58057733e..bf7533259 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::config::{ - BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, - BALLISTA_STANDALONE_PARALLELISM, -}; +use crate::config::BallistaConfig; use crate::error::{BallistaError, Result}; use crate::execution_plans::{ DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec, }; -use crate::serde::protobuf::KeyValuePair; + +use crate::extension::SessionConfigExt; use crate::serde::scheduler::PartitionStats; -use crate::serde::{BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec}; +use crate::serde::BallistaLogicalExtensionCodec; use async_trait::async_trait; use datafusion::arrow::datatypes::Schema; @@ -36,9 +34,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; use datafusion::datasource::physical_plan::{CsvExec, ParquetExec}; use datafusion::error::DataFusionError; -use datafusion::execution::context::{ - QueryPlanner, SessionConfig, SessionContext, SessionState, -}; +use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan}; @@ -54,8 +50,6 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; -use datafusion_proto::physical_plan::PhysicalExtensionCodec; -use datafusion_proto::protobuf::LogicalPlanNode; use futures::StreamExt; use log::error; use std::io::{BufWriter, Write}; @@ -248,382 +242,6 @@ fn build_exec_plan_diagram( Ok(node_id) } -/// Create a client DataFusion context that uses the BallistaQueryPlanner to send logical plans -/// to a Ballista scheduler -pub fn create_df_ctx_with_ballista_query_planner( - scheduler_url: String, - session_id: String, - config: &BallistaConfig, -) -> SessionContext { - // TODO: put ballista configuration as part of sessions state - // planner can get it from there. - // This would make it changeable during run time - // using SQL SET statement - let planner: Arc> = - Arc::new(BallistaQueryPlanner::new(scheduler_url, config.clone())); - - let session_config = SessionConfig::new_with_ballista() - .with_information_schema(true) - .with_option_extension(config.clone()); - - let session_state = SessionStateBuilder::new() - .with_default_features() - .with_config(session_config) - .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap())) - .with_query_planner(planner) - .with_session_id(session_id) - .build(); - // the SessionContext created here is the client side context, but the session_id is from server side. - SessionContext::new_with_state(session_state) -} - -pub trait SessionStateExt { - fn new_ballista_state( - scheduler_url: String, - session_id: String, - ) -> datafusion::error::Result; - fn upgrade_for_ballista( - self, - scheduler_url: String, - session_id: String, - ) -> datafusion::error::Result; - #[deprecated] - fn ballista_config(&self) -> BallistaConfig; -} - -impl SessionStateExt for SessionState { - fn ballista_config(&self) -> BallistaConfig { - self.config() - .options() - .extensions - .get::() - .cloned() - .unwrap_or_else(BallistaConfig::default) - } - - fn new_ballista_state( - scheduler_url: String, - session_id: String, - ) -> datafusion::error::Result { - let config = BallistaConfig::default(); - - let planner = - BallistaQueryPlanner::::new(scheduler_url, config.clone()); - - let session_config = SessionConfig::new() - .with_information_schema(true) - .with_option_extension(config.clone()) - // Ballista disables this option - .with_round_robin_repartition(false); - - let runtime_config = RuntimeConfig::default(); - let runtime_env = RuntimeEnv::new(runtime_config)?; - let session_state = SessionStateBuilder::new() - .with_default_features() - .with_config(session_config) - .with_runtime_env(Arc::new(runtime_env)) - .with_query_planner(Arc::new(planner)) - .with_session_id(session_id) - .build(); - - Ok(session_state) - } - - fn upgrade_for_ballista( - self, - scheduler_url: String, - session_id: String, - ) -> datafusion::error::Result { - let codec_logical = self.config().ballista_logical_extension_codec(); - let planner_override = self.config().ballista_query_planner(); - - let new_config = self - .config() - .options() - .extensions - .get::() - .cloned() - .unwrap_or_else(BallistaConfig::default); - - let session_config = self - .config() - .clone() - .with_option_extension(new_config.clone()) - // Ballista disables this option - .with_round_robin_repartition(false); - - let builder = SessionStateBuilder::new_from_existing(self) - .with_config(session_config) - .with_session_id(session_id); - - let builder = match planner_override { - Some(planner) => builder.with_query_planner(planner), - None => { - let planner = BallistaQueryPlanner::::with_extension( - scheduler_url, - new_config, - codec_logical, - ); - builder.with_query_planner(Arc::new(planner)) - } - }; - - Ok(builder.build()) - } -} - -pub trait SessionConfigExt { - /// Creates session config which has - /// ballista configuration initialized - fn new_with_ballista() -> SessionConfig; - - /// Overrides ballista's [LogicalExtensionCodec] - fn with_ballista_logical_extension_codec( - self, - codec: Arc, - ) -> SessionConfig; - - /// Overrides ballista's [PhysicalExtensionCodec] - fn with_ballista_physical_extension_codec( - self, - codec: Arc, - ) -> SessionConfig; - - /// returns [LogicalExtensionCodec] if set - /// or default ballista codec if not - fn ballista_logical_extension_codec(&self) -> Arc; - - /// returns [PhysicalExtensionCodec] if set - /// or default ballista codec if not - fn ballista_physical_extension_codec(&self) -> Arc; - - /// Overrides ballista's [QueryPlanner] - fn with_ballista_query_planner( - self, - planner: Arc, - ) -> SessionConfig; - - /// Returns ballista's [QueryPlanner] if overridden - fn ballista_query_planner( - &self, - ) -> Option>; - - fn ballista_standalone_parallelism(&self) -> usize; - - fn ballista_grpc_client_max_message_size(&self) -> usize; - - fn to_key_value_pairs(&self) -> Vec; - - fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self; - - fn with_ballista_job_name(self, job_name: &str) -> Self; - - fn with_ballista_grpc_client_max_message_size(self, max_size: usize) -> Self; - - fn with_ballista_standalone_parallelism(self, parallelism: usize) -> Self; - - fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]); -} - -impl SessionConfigExt for SessionConfig { - fn new_with_ballista() -> SessionConfig { - SessionConfig::new() - .with_option_extension(BallistaConfig::default()) - .with_target_partitions(16) - .with_round_robin_repartition(false) - } - fn with_ballista_logical_extension_codec( - self, - codec: Arc, - ) -> SessionConfig { - let extension = BallistaConfigExtensionLogicalCodec::new(codec); - self.with_extension(Arc::new(extension)) - } - fn with_ballista_physical_extension_codec( - self, - codec: Arc, - ) -> SessionConfig { - let extension = BallistaConfigExtensionPhysicalCodec::new(codec); - self.with_extension(Arc::new(extension)) - } - - fn ballista_logical_extension_codec(&self) -> Arc { - self.get_extension::() - .map(|c| c.codec()) - .unwrap_or_else(|| Arc::new(BallistaLogicalExtensionCodec::default())) - } - fn ballista_physical_extension_codec(&self) -> Arc { - self.get_extension::() - .map(|c| c.codec()) - .unwrap_or_else(|| Arc::new(BallistaPhysicalExtensionCodec::default())) - } - - fn with_ballista_query_planner( - self, - planner: Arc, - ) -> SessionConfig { - let extension = BallistaQueryPlannerExtension::new(planner); - self.with_extension(Arc::new(extension)) - } - - fn ballista_query_planner( - &self, - ) -> Option> { - self.get_extension::() - .map(|c| c.planner()) - } - - fn ballista_standalone_parallelism(&self) -> usize { - self.options() - .extensions - .get::() - .map(|c| c.default_standalone_parallelism()) - .unwrap_or_else(|| BallistaConfig::default().default_standalone_parallelism()) - } - - fn ballista_grpc_client_max_message_size(&self) -> usize { - self.options() - .extensions - .get::() - .map(|c| c.default_grpc_client_max_message_size()) - .unwrap_or_else(|| { - BallistaConfig::default().default_grpc_client_max_message_size() - }) - } - - fn to_key_value_pairs(&self) -> Vec { - self.options() - .entries() - .iter() - .filter(|v| v.value.is_some()) - .map( - // TODO MM make `value` optional value - |datafusion::config::ConfigEntry { key, value, .. }| { - log::trace!( - "sending configuration key: `{}`, value`{:?}`", - key, - value - ); - KeyValuePair { - key: key.to_owned(), - value: value.clone().unwrap(), - } - }, - ) - .collect() - } - - fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self { - let mut s = self; - for KeyValuePair { key, value } in key_value_pairs { - log::trace!( - "setting up configuration key: `{}`, value: `{}`", - key, - value - ); - if let Err(e) = s.options_mut().set(key, value) { - log::warn!( - "could not set configuration key: `{}`, value: `{}`, reason: {}", - key, - value, - e.to_string() - ) - } - } - s - } - - fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]) { - for KeyValuePair { key, value } in key_value_pairs { - log::trace!( - "setting up configuration key : `{}`, value: `{}`", - key, - value - ); - if let Err(e) = self.options_mut().set(key, value) { - log::warn!( - "could not set configuration key: `{}`, value: `{}`, reason: {}", - key, - value, - e.to_string() - ) - } - } - } - - fn with_ballista_job_name(self, job_name: &str) -> Self { - if self.options().extensions.get::().is_some() { - self.set_str(BALLISTA_JOB_NAME, job_name) - } else { - self.with_option_extension(BallistaConfig::default()) - .set_str(BALLISTA_JOB_NAME, job_name) - } - } - - fn with_ballista_grpc_client_max_message_size(self, max_size: usize) -> Self { - if self.options().extensions.get::().is_some() { - self.set_usize(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, max_size) - } else { - self.with_option_extension(BallistaConfig::default()) - .set_usize(BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, max_size) - } - } - - fn with_ballista_standalone_parallelism(self, parallelism: usize) -> Self { - if self.options().extensions.get::().is_some() { - self.set_usize(BALLISTA_STANDALONE_PARALLELISM, parallelism) - } else { - self.with_option_extension(BallistaConfig::default()) - .set_usize(BALLISTA_STANDALONE_PARALLELISM, parallelism) - } - } -} - -/// Wrapper for [SessionConfig] extension -/// holding [LogicalExtensionCodec] if overridden -struct BallistaConfigExtensionLogicalCodec { - codec: Arc, -} - -impl BallistaConfigExtensionLogicalCodec { - fn new(codec: Arc) -> Self { - Self { codec } - } - fn codec(&self) -> Arc { - self.codec.clone() - } -} - -/// Wrapper for [SessionConfig] extension -/// holding [PhysicalExtensionCodec] if overridden -struct BallistaConfigExtensionPhysicalCodec { - codec: Arc, -} - -impl BallistaConfigExtensionPhysicalCodec { - fn new(codec: Arc) -> Self { - Self { codec } - } - fn codec(&self) -> Arc { - self.codec.clone() - } -} - -/// Wrapper for [SessionConfig] extension -/// holding overridden [QueryPlanner] -struct BallistaQueryPlannerExtension { - planner: Arc, -} - -impl BallistaQueryPlannerExtension { - fn new(planner: Arc) -> Self { - Self { planner } - } - fn planner(&self) -> Arc { - self.planner.clone() - } -} - pub struct BallistaQueryPlanner { scheduler_url: String, config: BallistaConfig, @@ -815,17 +433,12 @@ mod test { error::Result, execution::{ runtime_env::{RuntimeConfig, RuntimeEnv}, - SessionState, SessionStateBuilder, + SessionStateBuilder, }, prelude::{SessionConfig, SessionContext}, }; - use crate::{ - config::BALLISTA_JOB_NAME, - utils::{LocalRun, SessionStateExt}, - }; - - use super::SessionConfigExt; + use crate::utils::LocalRun; fn context() -> SessionContext { let runtime_environment = RuntimeEnv::new(RuntimeConfig::new()).unwrap(); @@ -902,38 +515,4 @@ mod test { Ok(()) } - - // Ballista disables round robin repatriations - #[tokio::test] - async fn should_disable_round_robin_repartition() { - let state = SessionState::new_ballista_state( - "scheduler_url".to_string(), - "session_id".to_string(), - ) - .unwrap(); - - assert!(!state.config().round_robin_repartition()); - - let state = SessionStateBuilder::new().build(); - - assert!(state.config().round_robin_repartition()); - let state = state - .upgrade_for_ballista("scheduler_url".to_string(), "session_id".to_string()) - .unwrap(); - - assert!(!state.config().round_robin_repartition()); - } - #[test] - fn should_convert_to_key_value_pairs() { - // key value pairs should contain datafusion and ballista values - - let config = - SessionConfig::new_with_ballista().with_ballista_job_name("job_name"); - let pairs = config.to_key_value_pairs(); - - assert!(pairs.iter().any(|p| p.key == BALLISTA_JOB_NAME)); - assert!(pairs - .iter() - .any(|p| p.key == "datafusion.catalog.information_schema")) - } } diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 402ad2736..649b366b4 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -19,13 +19,13 @@ use crate::cpu_bound_executor::DedicatedExecutor; use crate::executor::Executor; use crate::{as_task_status, TaskExecutionTimes}; use ballista_core::error::BallistaError; +use ballista_core::extension::SessionConfigHelperExt; use ballista_core::serde::protobuf::{ scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus, }; use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId}; use ballista_core::serde::BallistaCodec; -use ballista_core::utils::SessionConfigExt; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::logical_plan::AsLogicalPlan; diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 5c60faf9b..2c2906f07 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -19,8 +19,9 @@ use crate::metrics::LoggingMetricsCollector; use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService}; use arrow_flight::flight_service_server::FlightServiceServer; use ballista_core::config::BallistaConfig; +use ballista_core::extension::SessionConfigExt; use ballista_core::registry::BallistaFunctionRegistry; -use ballista_core::utils::{default_config_producer, SessionConfigExt}; +use ballista_core::utils::default_config_producer; use ballista_core::{ error::Result, serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration}, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index b03a99307..52cdc9857 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -17,6 +17,7 @@ use axum::extract::ConnectInfo; use ballista_core::config::BALLISTA_JOB_NAME; +use ballista_core::extension::SessionConfigHelperExt; use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query}; use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc; use ballista_core::serde::protobuf::{ @@ -31,7 +32,6 @@ use ballista_core::serde::protobuf::{ UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::ExecutorMetadata; -use ballista_core::utils::SessionConfigExt; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use log::{debug, error, info, trace, warn}; diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 5fa222595..b6eeafda6 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -346,7 +346,7 @@ pub fn timestamp_millis() -> u64 { mod test { use std::sync::Arc; - use ballista_core::utils::SessionConfigExt; + use ballista_core::extension::SessionConfigExt; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::functions_aggregate::sum::sum; use datafusion::logical_expr::{col, LogicalPlan}; diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs index 1e7d93844..9ad887c60 100644 --- a/ballista/scheduler/src/standalone.rs +++ b/ballista/scheduler/src/standalone.rs @@ -19,10 +19,10 @@ use crate::cluster::BallistaCluster; use crate::config::SchedulerConfig; use crate::metrics::default_metrics_collector; use crate::scheduler_server::SchedulerServer; +use ballista_core::extension::SessionConfigExt; use ballista_core::serde::BallistaCodec; use ballista_core::utils::{ create_grpc_server, default_config_producer, default_session_builder, - SessionConfigExt, }; use ballista_core::ConfigProducer; use ballista_core::{ diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index f2c9bf1d8..68a2ebdfc 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -418,7 +418,7 @@ mod tests { use crate::state::execution_graph::ExecutionGraph; use crate::state::execution_graph_dot::ExecutionGraphDot; use ballista_core::error::{BallistaError, Result}; - use ballista_core::utils::SessionConfigExt; + use ballista_core::extension::SessionConfigExt; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 11b99ae57..2e5b76b48 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -24,7 +24,7 @@ use crate::state::executor_manager::ExecutorManager; use ballista_core::error::BallistaError; use ballista_core::error::Result; -use ballista_core::utils::SessionConfigExt; +use ballista_core::extension::SessionConfigHelperExt; use datafusion::prelude::SessionConfig; use crate::cluster::JobState; diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 34f7076f5..629cc285b 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -16,6 +16,7 @@ // under the License. use ballista_core::error::{BallistaError, Result}; +use ballista_core::extension::SessionConfigExt; use datafusion::catalog::Session; use std::any::Any; use std::collections::HashMap; @@ -56,9 +57,7 @@ use crate::cluster::BallistaCluster; use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::state::execution_graph::{ExecutionGraph, ExecutionStage, TaskDescription}; -use ballista_core::utils::{ - default_config_producer, default_session_builder, SessionConfigExt, -}; +use ballista_core::utils::{default_config_producer, default_session_builder}; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; use parking_lot::Mutex; use tokio::sync::mpsc::{channel, Receiver, Sender};