Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim down BallistaConfig #1108

Merged
merged 19 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use std::path::Path;

use ballista::{
extension::SessionConfigExt,
prelude::{
Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
prelude::{Result, SessionContextExt},
};
use ballista_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION,
Expand Down Expand Up @@ -118,12 +115,11 @@ pub async fn main() -> Result<()> {
env::set_current_dir(p).unwrap();
};

let mut ballista_config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true");
let mut ballista_config =
SessionConfig::new_with_ballista().with_information_schema(true);

if let Some(batch_size) = args.batch_size {
ballista_config =
ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &batch_size.to_string());
ballista_config = ballista_config.with_batch_size(batch_size);
};

let ctx = match (args.host, args.port) {
Expand All @@ -139,10 +135,8 @@ pub async fn main() -> Result<()> {
}
_ => {
if let Some(concurrent_tasks) = args.concurrent_tasks {
ballista_config = ballista_config.set_str(
BALLISTA_STANDALONE_PARALLELISM,
&concurrent_tasks.to_string(),
);
ballista_config =
ballista_config.with_target_partitions(concurrent_tasks);
};
let state = SessionStateBuilder::new()
.with_config(ballista_config)
Expand Down
4 changes: 1 addition & 3 deletions ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, aver
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let config = BallistaConfig::default();

// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
Expand Down
59 changes: 14 additions & 45 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{CreateSessionParams, KeyValuePair};
use ballista_core::utils::{
create_df_ctx_with_ballista_query_planner, create_grpc_client_connection,
SessionConfigExt,
};
use datafusion_proto::protobuf::LogicalPlanNode;

Expand Down Expand Up @@ -360,11 +361,8 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock();
ctx = Arc::new(SessionContext::new_with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
SessionConfig::new_with_ballista(),
));
}

Expand Down Expand Up @@ -485,13 +483,11 @@ impl BallistaContext {
#[cfg(test)]
#[cfg(feature = "standalone")]
mod standalone_tests {
use ballista_core::config::BallistaConfig;
use datafusion::arrow;
use datafusion::arrow::util::pretty::pretty_format_batches;

use crate::context::BallistaContext;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use ballista_core::error::Result;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrameWriteOptions;
Expand All @@ -502,7 +498,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_standalone_mode() {
use super::*;
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
let context = BallistaContext::standalone(&BallistaConfig::default(), 1)
.await
.unwrap();
let df = context.sql("SELECT 1;").await.unwrap();
Expand All @@ -512,8 +508,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_write_parquet() -> Result<()> {
use super::*;
let context =
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1).await?;
let context = BallistaContext::standalone(&BallistaConfig::default(), 1).await?;
let df = context.sql("SELECT 1;").await?;
let tmp_dir = TempDir::new().unwrap();
let file_path = format!(
Expand All @@ -532,8 +527,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_write_csv() -> Result<()> {
use super::*;
let context =
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1).await?;
let context = BallistaContext::standalone(&BallistaConfig::default(), 1).await?;
let df = context.sql("SELECT 1;").await?;
let tmp_dir = TempDir::new().unwrap();
let file_path =
Expand All @@ -549,7 +543,7 @@ mod standalone_tests {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
let context = BallistaContext::standalone(&BallistaConfig::default(), 1)
.await
.unwrap();

Expand Down Expand Up @@ -587,18 +581,14 @@ mod standalone_tests {
}

#[tokio::test]
#[ignore = "this one fails after config change (will be removed)"]
async fn test_show_tables_not_with_information_schema() {
use super::*;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};

use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Expand Down Expand Up @@ -643,13 +633,7 @@ mod standalone_tests {
ListingOptions, ListingTable, ListingTableConfig,
};

use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

context
Expand Down Expand Up @@ -711,14 +695,8 @@ mod standalone_tests {
#[tokio::test]
async fn test_empty_exec_with_one_row() {
use crate::context::BallistaContext;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};

let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let sql = "select EXTRACT(year FROM to_timestamp('2020-09-08T12:13:14+00:00'));";
Expand All @@ -730,14 +708,8 @@ mod standalone_tests {
#[tokio::test]
async fn test_union_and_union_all() {
use super::*;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use datafusion::arrow::util::pretty::pretty_format_batches;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let df = context
Expand Down Expand Up @@ -1056,10 +1028,7 @@ mod standalone_tests {
);
}
async fn create_test_context() -> BallistaContext {
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 4).await.unwrap();

context
Expand Down
54 changes: 18 additions & 36 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

pub use ballista_core::utils::SessionConfigExt;
use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams},
utils::{create_grpc_client_connection, SessionStateExt},
};
use datafusion::{
error::DataFusionError, execution::SessionState, prelude::SessionContext,
error::DataFusionError,
execution::SessionState,
prelude::{SessionConfig, SessionContext},
};
use url::Url;

Expand Down Expand Up @@ -100,7 +99,7 @@ impl SessionContextExt for SessionContext {
url: &str,
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();
let config = state.config();

let scheduler_url = Extension::parse_url(url)?;
log::info!(
Expand All @@ -120,15 +119,14 @@ impl SessionContextExt for SessionContext {
}

async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let config = SessionConfig::new_with_ballista();
let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let remote_session_id =
Extension::setup_remote(config, scheduler_url.clone()).await?;
Extension::setup_remote(&config, scheduler_url.clone()).await?;
log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
Expand All @@ -143,10 +141,8 @@ impl SessionContextExt for SessionContext {
async fn standalone_with_state(
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, Some(&state)).await?;
Extension::setup_standalone(Some(&state)).await?;

let session_state =
state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?;
Expand All @@ -162,11 +158,9 @@ impl SessionContextExt for SessionContext {
#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<Self> {
log::info!("Running in local mode. Scheduler will be run in-proc");
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, None).await?;
Extension::setup_standalone(None).await?;

let session_state =
SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?;
Expand Down Expand Up @@ -197,10 +191,9 @@ impl Extension {

#[cfg(feature = "standalone")]
async fn setup_standalone(
config: BallistaConfig,
session_state: Option<&SessionState>,
) -> datafusion::error::Result<(String, String)> {
use ballista_core::serde::BallistaCodec;
use ballista_core::{serde::BallistaCodec, utils::default_config_producer};

let addr = match session_state {
None => ballista_scheduler::standalone::new_standalone_scheduler()
Expand All @@ -214,6 +207,9 @@ impl Extension {
.map_err(|e| DataFusionError::Configuration(e.to_string()))?
}
};
let config = session_state
.map(|s| s.config().clone())
.unwrap_or_else(default_config_producer);

let scheduler_url = format!("http://localhost:{}", addr.port());

Expand All @@ -229,21 +225,14 @@ impl Extension {

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
settings: config.to_key_value_pairs(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

let concurrent_tasks = config.default_standalone_parallelism();
let concurrent_tasks = config.ballista_standalone_parallelism();

match session_state {
None => {
Expand All @@ -269,28 +258,21 @@ impl Extension {
}

async fn setup_remote(
config: BallistaConfig,
config: &SessionConfig,
scheduler_url: String,
) -> datafusion::error::Result<String> {
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let limit = config.default_grpc_client_max_message_size();
let limit = config.ballista_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
settings: config.to_key_value_pairs(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
Expand Down
8 changes: 1 addition & 7 deletions ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@
//! Ballista Prelude (common imports)

pub use ballista_core::{
config::{
BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE,
BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_REPARTITION_AGGREGATIONS,
BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
config::BallistaConfig,
error::{BallistaError, Result},
};

Expand Down
Loading
Loading