Skip to content

Commit

Permalink
Add PyScheduler and PyExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Dec 7, 2024
1 parent 3af9ae0 commit f900610
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 15 deletions.
4 changes: 3 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ publish = false

[dependencies]
async-trait = "0.1.77"
ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] }
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
datafusion = { version = "42", features = ["pyarrow", "avro"] }
datafusion-proto = { version = "42" }
datafusion-python = { version = "42" }
Expand Down
4 changes: 2 additions & 2 deletions python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import pyarrow as pa

from .ballista_internal import (
BallistaBuilder,
BallistaBuilder, BallistaScheduler, BallistaExecutor
)

__version__ = importlib_metadata.version(__name__)

__all__ = [
"BallistaBuilder",
"BallistaBuilder", "BallistaScheduler", "BallistaExecutor"
]
15 changes: 9 additions & 6 deletions python/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
# specific language governing permissions and limitations
# under the License.

# %%

from ballista import BallistaBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
# set config variables with `config`
ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "example ballista")\
.config("ballista.shuffle.partitions", "16")\
.standalone()
# ctx: SessionContext = BallistaBuilder()\
# .config("ballista.job.name", "example ballista")\
# .config("ballista.shuffle.partitions", "16")\
# .standalone()

#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)
ctx: SessionContext = BallistaBuilder().remote("df://127.0.0.1:50050")

# Select 1 to verify its working
ctx.sql("SELECT 1").show()
#ctx_remote.sql("SELECT 2").show()

# %%
27 changes: 27 additions & 0 deletions python/examples/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%
from ballista import BallistaExecutor
# %%
executor = BallistaExecutor()
# %%
executor.start()
# %%
executor.wait_for_termination()
# %%
# %%
26 changes: 26 additions & 0 deletions python/examples/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%
from ballista import BallistaScheduler
# %%
scheduler = BallistaScheduler()
# %%
scheduler.start()
# %%
scheduler.wait_for_termination()
# %%
6 changes: 4 additions & 2 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
datafusion==35.0.0
datafusion==42.0.0
pyarrow
pytest
pytest
maturin==1.5.1
cloudpickle
152 changes: 152 additions & 0 deletions python/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::future::IntoFuture;
use std::sync::Arc;

use crate::codec::{PyLogicalCodec, PyPhysicalCodec};
use crate::utils::to_pyerr;
use crate::utils::{spawn_feature, wait_for_future};
use ballista_executor::executor_process::{
start_executor_process, ExecutorProcessConfig,
};
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::SchedulerConfig;
use ballista_scheduler::scheduler_process::start_server;
use pyo3::exceptions::PyException;
use pyo3::{pyclass, pymethods, PyResult, Python};
use tokio::task::JoinHandle;

#[pyclass(name = "BallistaScheduler", module = "ballista", subclass)]
pub struct PyScheduler {
config: SchedulerConfig,
handle: Option<JoinHandle<()>>,
}

#[pymethods]
impl PyScheduler {
#[pyo3(signature = (bind_port=None))]
#[new]
pub fn new(py: Python, bind_port: Option<u16>) -> Self {
let mut config = SchedulerConfig::default();

if let Some(bind_port) = bind_port {
config.bind_port = bind_port;
}

config.override_logical_codec =
Some(Arc::new(PyLogicalCodec::try_new(py).unwrap()));
config.override_physical_codec =
Some(Arc::new(PyPhysicalCodec::try_new(py).unwrap()));
Self {
config,
handle: None,
}
}

pub fn start(&mut self, py: Python) -> PyResult<()> {
if self.handle.is_some() {
return Err(PyException::new_err("Scheduler already started"));
}
let cluster = wait_for_future(py, BallistaCluster::new_from_config(&self.config))
.map_err(|e| to_pyerr(e))?;

let config = self.config.clone();
let addr = format!("0.0.0.0:{}", config.bind_port);
let addr = addr.parse()?;
let handle = spawn_feature(py, async move {
start_server(cluster, addr, Arc::new(config)).await.unwrap();
});
self.handle = Some(handle);

Ok(())
}

pub fn wait_for_termination(&mut self, py: Python) -> PyResult<()> {
if self.handle.is_none() {
return Err(PyException::new_err("Scheduler not started"));
}
let mut handle = None;
std::mem::swap(&mut self.handle, &mut handle);

match handle {
Some(handle) => wait_for_future(py, handle.into_future())
.map_err(|e| PyException::new_err(e.to_string())),
None => Ok(()),
}
}
}

#[pyclass(name = "BallistaExecutor", module = "ballista", subclass)]
pub struct PyExecutor {
config: Arc<ExecutorProcessConfig>,
handle: Option<JoinHandle<()>>,
}

#[pymethods]
impl PyExecutor {
#[pyo3(signature = (bind_port=None, bind_host =None, scheduler_host = None, scheduler_port = None))]
#[new]
pub fn new(
py: Python,
bind_port: Option<u16>,
bind_host: Option<String>,
scheduler_host: Option<String>,
scheduler_port: Option<u16>,
) -> PyResult<Self> {
let mut config = ExecutorProcessConfig::default();
if let Some(port) = bind_port {
config.port = port;
}

if let Some(host) = bind_host {
config.bind_host = host;
}

if let Some(port) = scheduler_port {
config.scheduler_port = port;
}

if let Some(host) = scheduler_host {
config.scheduler_host = host;
}

config.override_logical_codec = Some(Arc::new(PyLogicalCodec::try_new(py)?));
config.override_physical_codec = Some(Arc::new(PyPhysicalCodec::try_new(py)?));

let config = Arc::new(config);
Ok(Self {
config,
handle: None,
})
}

pub fn start(&mut self, py: Python) -> PyResult<()> {
if self.handle.is_some() {
return Err(PyException::new_err("Executor already started"));
}

let config = self.config.clone();

let handle =
spawn_feature(
py,
async move { start_executor_process(config).await.unwrap() },
);
self.handle = Some(handle);

Ok(())
}

pub fn wait_for_termination(&mut self, py: Python) -> PyResult<()> {
if self.handle.is_none() {
return Err(PyException::new_err("Executor not started"));
}
let mut handle = None;
std::mem::swap(&mut self.handle, &mut handle);

match handle {
Some(handle) => wait_for_future(py, handle.into_future())
.map_err(|e| PyException::new_err(e.to_string()))
.map(|_| ()),
None => Ok(()),
}
}
}
Loading

0 comments on commit f900610

Please sign in to comment.