Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor configuration extended .. #1099

Merged
merged 12 commits into from
Oct 31, 2024
3 changes: 3 additions & 0 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { version = "0.2" }
env_logger = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
testcontainers-modules = { version = "0.11", features = ["minio"] }

[features]
azure = ["ballista-core/azure"]
default = []
s3 = ["ballista-core/s3"]
standalone = ["ballista-executor", "ballista-scheduler"]
testcontainers = []
66 changes: 40 additions & 26 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

pub use ballista_core::utils::BallistaSessionConfigExt;
pub use ballista_core::utils::SessionConfigExt;
use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
utils::{create_grpc_client_connection, BallistaSessionStateExt},
utils::{create_grpc_client_connection, SessionStateExt},
};
use datafusion::{
error::DataFusionError, execution::SessionState, prelude::SessionContext,
Expand Down Expand Up @@ -65,6 +65,7 @@ const DEFAULT_SCHEDULER_PORT: u16 = 50050;
/// There are still few limitations on query distribution, thus not all
/// [SessionContext] functionalities are supported.
///

#[async_trait::async_trait]
pub trait SessionContextExt {
/// Creates a context for executing queries against a standalone Ballista scheduler instance
Expand Down Expand Up @@ -144,14 +145,8 @@ impl SessionContextExt for SessionContext {
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();

let codec_logical = state.config().ballista_logical_extension_codec();
let codec_physical = state.config().ballista_physical_extension_codec();

let ballista_codec =
ballista_core::serde::BallistaCodec::new(codec_logical, codec_physical);

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, ballista_codec).await?;
Extension::setup_standalone(config, Some(&state)).await?;

let session_state =
state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?;
Expand All @@ -170,10 +165,8 @@ impl SessionContextExt for SessionContext {
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let ballista_codec = ballista_core::serde::BallistaCodec::default();

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, ballista_codec).await?;
Extension::setup_standalone(config, None).await?;

let session_state =
SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?;
Expand Down Expand Up @@ -205,14 +198,22 @@ impl Extension {
#[cfg(feature = "standalone")]
async fn setup_standalone(
config: BallistaConfig,
ballista_codec: ballista_core::serde::BallistaCodec<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>,
session_state: Option<&SessionState>,
) -> datafusion::error::Result<(String, String)> {
let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
use ballista_core::serde::BallistaCodec;

let addr = match session_state {
None => ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?,
Some(session_state) => {
ballista_scheduler::standalone::new_standalone_scheduler_from_state(
session_state,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?
}
};

let scheduler_url = format!("http://localhost:{}", addr.port());

Expand Down Expand Up @@ -243,13 +244,26 @@ impl Extension {
.session_id;

let concurrent_tasks = config.default_standalone_parallelism();
ballista_executor::new_standalone_executor(
scheduler,
concurrent_tasks,
ballista_codec,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

match session_state {
None => {
ballista_executor::new_standalone_executor(
scheduler,
concurrent_tasks,
BallistaCodec::default(),
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
Some(session_state) => {
ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(scheduler, concurrent_tasks, session_state)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
}

Ok((remote_session_id, scheduler_url))
}
Expand Down
89 changes: 89 additions & 0 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,53 @@ use ballista::prelude::BallistaConfig;
use ballista_core::serde::{
protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
};
use datafusion::execution::SessionState;
use object_store::aws::AmazonS3Builder;
use testcontainers_modules::minio::MinIO;
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::testcontainers::ContainerRequest;
use testcontainers_modules::{minio, testcontainers::ImageExt};

pub const REGION: &str = "eu-west-1";
pub const BUCKET: &str = "ballista";
pub const ACCESS_KEY_ID: &str = "MINIO";
pub const SECRET_KEY: &str = "MINIOMINIO";

#[allow(dead_code)]
pub fn create_s3_store(
port: u16,
) -> std::result::Result<object_store::aws::AmazonS3, object_store::Error> {
AmazonS3Builder::new()
.with_endpoint(format!("http://localhost:{port}"))
.with_region(REGION)
.with_bucket_name(BUCKET)
.with_access_key_id(ACCESS_KEY_ID)
.with_secret_access_key(SECRET_KEY)
.with_allow_http(true)
.build()
}

#[allow(dead_code)]
pub fn create_minio_container() -> ContainerRequest<minio::MinIO> {
MinIO::default()
.with_env_var("MINIO_ACCESS_KEY", ACCESS_KEY_ID)
.with_env_var("MINIO_SECRET_KEY", SECRET_KEY)
}

#[allow(dead_code)]
pub fn create_bucket_command() -> ExecCommand {
// this is hack to create a bucket without creating s3 client.
// this works with current testcontainer (and image) version 'RELEASE.2022-02-07T08-17-33Z'.
// (testcontainer does not await properly on latest image version)
//
// if testcontainer image version change to something newer we should use "mc mb /data/ballista"
// to crate a bucket.
ExecCommand::new(vec![
"mkdir".to_string(),
format!("/data/{}", crate::common::BUCKET),
])
.with_cmd_ready_condition(CmdWaitFor::seconds(1))
}

// /// Remote ballista cluster to be used for local testing.
// static BALLISTA_CLUSTER: tokio::sync::OnceCell<(String, u16)> =
Expand Down Expand Up @@ -136,6 +183,48 @@ pub async fn setup_test_cluster() -> (String, u16) {
(host, addr.port())
}

/// starts a ballista cluster for integration tests
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
let config = BallistaConfig::builder().build().unwrap();
//let default_codec = BallistaCodec::default();

let addr = ballista_scheduler::standalone::new_standalone_scheduler_from_state(
&session_state,
)
.await
.expect("scheduler to be created");

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};

ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(
scheduler,
config.default_standalone_parallelism(),
&session_state,
)
.await
.expect("executor to be created");

log::info!("test scheduler created at: {}:{}", host, addr.port());

(host, addr.port())
}

#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for test
Expand Down
Loading
Loading