Skip to content

Commit

Permalink
Retire BallistaContext ...
Browse files Browse the repository at this point in the history
... replace it with SessionContext.
  • Loading branch information
milenkovicm committed Oct 10, 2024
1 parent 49fe190 commit 2f2fe84
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 269 deletions.
1 change: 1 addition & 0 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ parking_lot = { workspace = true }
sqlparser = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }

[features]
azure = ["ballista-core/azure"]
Expand Down
165 changes: 0 additions & 165 deletions ballista/client/src/columnar_batch.rs

This file was deleted.

5 changes: 5 additions & 0 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BallistaContextState {
}
}

#[deprecated]
pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
Expand Down Expand Up @@ -478,6 +479,10 @@ impl BallistaContext {
let ctx = self.context.clone();
ctx.execute_logical_plan(plan).await
}

pub fn ctx(&self) -> SessionContext {
self.context().clone()
}
}

#[cfg(test)]
Expand Down
142 changes: 142 additions & 0 deletions ballista/client/src/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
utils::{create_df_ctx_with_ballista_query_planner, create_grpc_client_connection},
};
use datafusion::{error::DataFusionError, prelude::SessionContext};
use datafusion_proto::protobuf::LogicalPlanNode;

#[async_trait::async_trait]
pub trait BallistaExt {
async fn ballista_standalone(
config: &BallistaConfig,
) -> datafusion::error::Result<SessionContext>;

async fn remote(
host: &str,
port: u16,
config: &BallistaConfig,
) -> datafusion::error::Result<SessionContext>;
}

#[async_trait::async_trait]
impl BallistaExt for SessionContext {
async fn remote(
host: &str,
port: u16,
config: &BallistaConfig,
) -> datafusion::error::Result<SessionContext> {
let scheduler_url = format!("http://{}:{}", &host, port);
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let limit = config.default_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

Ok(ctx)
}

async fn ballista_standalone(
config: &BallistaConfig,
) -> datafusion::error::Result<Self> {
use ballista_core::serde::BallistaCodec;
use datafusion_proto::protobuf::PhysicalPlanNode;

log::info!("Running in local mode. Scheduler will be run in-proc");

let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let scheduler_url = format!("http://localhost:{}", addr.port());
let mut scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to in-proc scheduler...");
}
Ok(scheduler) => break scheduler,
}
};

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

let parallelism = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(2);

ballista_executor::new_standalone_executor(scheduler, parallelism, default_codec)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

Ok(ctx)
}
}
2 changes: 1 addition & 1 deletion ballista/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@

#![doc = include_str!("../README.md")]

pub mod columnar_batch;
pub mod context;
mod ext;
pub mod prelude;
1 change: 1 addition & 0 deletions ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ pub use ballista_core::{
pub use futures::StreamExt;

pub use crate::context::BallistaContext;
pub use crate::ext::BallistaExt;
Loading

0 comments on commit 2f2fe84

Please sign in to comment.