Skip to content

Commit

Permalink
Initial session store implementation for extensions (#1096)
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm authored Oct 26, 2024
1 parent 95e925a commit 5226595
Show file tree
Hide file tree
Showing 6 changed files with 769 additions and 72 deletions.
2 changes: 2 additions & 0 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tokio = { workspace = true }
url = { version = "2.5" }

[dev-dependencies]
ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { version = "0.2" }
env_logger = { workspace = true }

Expand Down
225 changes: 155 additions & 70 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.

pub use ballista_core::utils::BallistaSessionConfigExt;
use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
utils::{create_df_ctx_with_ballista_query_planner, create_grpc_client_connection},
utils::{create_grpc_client_connection, BallistaSessionStateExt},
};
use datafusion::{
error::DataFusionError, execution::SessionState, prelude::SessionContext,
};
use datafusion::{error::DataFusionError, prelude::SessionContext};
use datafusion_proto::protobuf::LogicalPlanNode;
use url::Url;

const DEFAULT_SCHEDULER_PORT: u16 = 50050;
Expand Down Expand Up @@ -65,86 +67,155 @@ const DEFAULT_SCHEDULER_PORT: u16 = 50050;
///
#[async_trait::async_trait]
pub trait SessionContextExt {
/// Create a context for executing queries against a standalone Ballista scheduler instance
/// Creates a context for executing queries against a standalone Ballista scheduler instance
///
/// It wills start local ballista cluster with scheduler and executor.
#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<SessionContext>;

/// Create a context for executing queries against a remote Ballista scheduler instance
/// Creates a context for executing queries against a standalone Ballista scheduler instance
/// with custom session state.
///
/// It wills start local ballista cluster with scheduler and executor.
#[cfg(feature = "standalone")]
async fn standalone_with_state(
state: SessionState,
) -> datafusion::error::Result<SessionContext>;

/// Creates a context for executing queries against a remote Ballista scheduler instance
async fn remote(url: &str) -> datafusion::error::Result<SessionContext>;

/// Creates a context for executing queries against a remote Ballista scheduler instance
/// with custom session state
async fn remote_with_state(
url: &str,
state: SessionState,
) -> datafusion::error::Result<SessionContext>;
}

#[async_trait::async_trait]
impl SessionContextExt for SessionContext {
async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
let url =
Url::parse(url).map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let host = url.host().ok_or(DataFusionError::Configuration(
"hostname should be provided".to_string(),
))?;
let port = url.port().unwrap_or(DEFAULT_SCHEDULER_PORT);
let scheduler_url = format!("http://{}:{}", &host, port);
async fn remote_with_state(
url: &str,
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();

let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
let remote_session_id =
Extension::setup_remote(config, scheduler_url.clone()).await?;
log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);
let session_state =
state.upgrade_for_ballista(scheduler_url, remote_session_id)?;

Ok(SessionContext::new_with_state(session_state))
}

let config = BallistaConfig::builder()
.build()
async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let remote_session_id =
Extension::setup_remote(config, scheduler_url.clone()).await?;
log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);
let session_state =
SessionState::new_ballista_state(scheduler_url, remote_session_id)?;

let limit = config.default_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);
Ok(SessionContext::new_with_state(session_state))
}

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;
#[cfg(feature = "standalone")]
async fn standalone_with_state(
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();

let codec_logical = state.config().ballista_logical_extension_codec();
let codec_physical = state.config().ballista_physical_extension_codec();

let ballista_codec =
ballista_core::serde::BallistaCodec::new(codec_logical, codec_physical);

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, ballista_codec).await?;

let session_state =
state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

Ok(ctx)
Ok(SessionContext::new_with_state(session_state))
}

#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<Self> {
use ballista_core::serde::BallistaCodec;
use datafusion_proto::protobuf::PhysicalPlanNode;

log::info!("Running in local mode. Scheduler will be run in-proc");
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let ballista_codec = ballista_core::serde::BallistaCodec::default();

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, ballista_codec).await?;

let session_state =
SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

Ok(SessionContext::new_with_state(session_state))
}
}

struct Extension {}

impl Extension {
fn parse_url(url: &str) -> datafusion::error::Result<String> {
let url =
Url::parse(url).map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let host = url.host().ok_or(DataFusionError::Configuration(
"hostname should be provided".to_string(),
))?;
let port = url.port().unwrap_or(DEFAULT_SCHEDULER_PORT);
let scheduler_url = format!("http://{}:{}", &host, port);

Ok(scheduler_url)
}

#[cfg(feature = "standalone")]
async fn setup_standalone(
config: BallistaConfig,
ballista_codec: ballista_core::serde::BallistaCodec<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>,
) -> datafusion::error::Result<(String, String)> {
let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let scheduler_url = format!("http://localhost:{}", addr.port());

let mut scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
Expand All @@ -154,9 +225,7 @@ impl SessionContextExt for SessionContext {
Ok(scheduler) => break scheduler,
}
};
let config = BallistaConfig::builder()
.build()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
Expand All @@ -173,31 +242,47 @@ impl SessionContextExt for SessionContext {
.into_inner()
.session_id;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

let concurrent_tasks = config.default_standalone_parallelism();
ballista_executor::new_standalone_executor(
scheduler,
concurrent_tasks,
default_codec,
ballista_codec,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

Ok(ctx)
Ok((remote_session_id, scheduler_url))
}

async fn setup_remote(
config: BallistaConfig,
scheduler_url: String,
) -> datafusion::error::Result<String> {
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let limit = config.default_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

Ok(remote_session_id)
}
}
Loading

0 comments on commit 5226595

Please sign in to comment.