From f90061051d1df9e92154d86ba0253537d658c322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 7 Dec 2024 19:23:55 +0000 Subject: [PATCH] Add PyScheduler and PyExecutor --- python/Cargo.toml | 4 +- python/ballista/__init__.py | 4 +- python/examples/example.py | 15 ++- python/examples/executor.py | 27 +++++ python/examples/scheduler.py | 26 ++++ python/requirements.txt | 6 +- python/src/cluster.rs | 152 +++++++++++++++++++++++ python/src/codec.rs | 225 +++++++++++++++++++++++++++++++++++ python/src/lib.rs | 12 +- python/src/utils.rs | 40 ++++++- 10 files changed, 496 insertions(+), 15 deletions(-) create mode 100644 python/examples/executor.py create mode 100644 python/examples/scheduler.py create mode 100644 python/src/cluster.rs create mode 100644 python/src/codec.rs diff --git a/python/Cargo.toml b/python/Cargo.toml index b03f1e9978..ac63708a81 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -31,8 +31,10 @@ publish = false [dependencies] async-trait = "0.1.77" -ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] } +ballista = { path = "../ballista/client", version = "0.12.0" } ballista-core = { path = "../ballista/core", version = "0.12.0" } +ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" } +ballista-executor = { path = "../ballista/executor", version = "0.12.0" } datafusion = { version = "42", features = ["pyarrow", "avro"] } datafusion-proto = { version = "42" } datafusion-python = { version = "42" } diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index a143f17e9c..4e80422b7b 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -26,11 +26,11 @@ import pyarrow as pa from .ballista_internal import ( - BallistaBuilder, + BallistaBuilder, BallistaScheduler, BallistaExecutor ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "BallistaBuilder", + "BallistaBuilder", "BallistaScheduler", "BallistaExecutor" ] \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 61a9abbd2e..e15d9d10a4 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,18 +15,21 @@ # specific language governing permissions and limitations # under the License. +# %% + from ballista import BallistaBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config # set config variables with `config` -ctx: SessionContext = BallistaBuilder()\ - .config("ballista.job.name", "example ballista")\ - .config("ballista.shuffle.partitions", "16")\ - .standalone() +# ctx: SessionContext = BallistaBuilder()\ +# .config("ballista.job.name", "example ballista")\ +# .config("ballista.shuffle.partitions", "16")\ +# .standalone() -#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) +ctx: SessionContext = BallistaBuilder().remote("df://127.0.0.1:50050") # Select 1 to verify its working ctx.sql("SELECT 1").show() -#ctx_remote.sql("SELECT 2").show() \ No newline at end of file + +# %% diff --git a/python/examples/executor.py b/python/examples/executor.py new file mode 100644 index 0000000000..4f6dbfcf37 --- /dev/null +++ b/python/examples/executor.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# %% +from ballista import BallistaExecutor +# %% +executor = BallistaExecutor() +# %% +executor.start() +# %% +executor.wait_for_termination() +# %% +# %% diff --git a/python/examples/scheduler.py b/python/examples/scheduler.py new file mode 100644 index 0000000000..a773bcce40 --- /dev/null +++ b/python/examples/scheduler.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# %% +from ballista import BallistaScheduler +# %% +scheduler = BallistaScheduler() +# %% +scheduler.start() +# %% +scheduler.wait_for_termination() +# %% diff --git a/python/requirements.txt b/python/requirements.txt index a03a8f8d2e..e3383f8046 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,5 @@ -datafusion==35.0.0 +datafusion==42.0.0 pyarrow -pytest \ No newline at end of file +pytest +maturin==1.5.1 +cloudpickle \ No newline at end of file diff --git a/python/src/cluster.rs b/python/src/cluster.rs new file mode 100644 index 0000000000..db30bd47db --- /dev/null +++ b/python/src/cluster.rs @@ -0,0 +1,152 @@ +use std::future::IntoFuture; +use std::sync::Arc; + +use crate::codec::{PyLogicalCodec, PyPhysicalCodec}; +use crate::utils::to_pyerr; +use crate::utils::{spawn_feature, wait_for_future}; +use ballista_executor::executor_process::{ + start_executor_process, ExecutorProcessConfig, +}; +use ballista_scheduler::cluster::BallistaCluster; +use ballista_scheduler::config::SchedulerConfig; +use ballista_scheduler::scheduler_process::start_server; +use pyo3::exceptions::PyException; +use pyo3::{pyclass, pymethods, PyResult, Python}; +use tokio::task::JoinHandle; + +#[pyclass(name = "BallistaScheduler", module = "ballista", subclass)] +pub struct PyScheduler { + config: SchedulerConfig, + handle: Option>, +} + +#[pymethods] +impl PyScheduler { + #[pyo3(signature = (bind_port=None))] + #[new] + pub fn new(py: Python, bind_port: Option) -> Self { + let mut config = SchedulerConfig::default(); + + if let Some(bind_port) = bind_port { + config.bind_port = bind_port; + } + + config.override_logical_codec = + Some(Arc::new(PyLogicalCodec::try_new(py).unwrap())); + config.override_physical_codec = + Some(Arc::new(PyPhysicalCodec::try_new(py).unwrap())); + Self { + config, + handle: None, + } + } + + pub fn start(&mut self, py: Python) -> PyResult<()> { + if self.handle.is_some() { + return Err(PyException::new_err("Scheduler already started")); + } + let cluster = wait_for_future(py, BallistaCluster::new_from_config(&self.config)) + .map_err(|e| to_pyerr(e))?; + + let config = self.config.clone(); + let addr = format!("0.0.0.0:{}", config.bind_port); + let addr = addr.parse()?; + let handle = spawn_feature(py, async move { + start_server(cluster, addr, Arc::new(config)).await.unwrap(); + }); + self.handle = Some(handle); + + Ok(()) + } + + pub fn wait_for_termination(&mut self, py: Python) -> PyResult<()> { + if self.handle.is_none() { + return Err(PyException::new_err("Scheduler not started")); + } + let mut handle = None; + std::mem::swap(&mut self.handle, &mut handle); + + match handle { + Some(handle) => wait_for_future(py, handle.into_future()) + .map_err(|e| PyException::new_err(e.to_string())), + None => Ok(()), + } + } +} + +#[pyclass(name = "BallistaExecutor", module = "ballista", subclass)] +pub struct PyExecutor { + config: Arc, + handle: Option>, +} + +#[pymethods] +impl PyExecutor { + #[pyo3(signature = (bind_port=None, bind_host =None, scheduler_host = None, scheduler_port = None))] + #[new] + pub fn new( + py: Python, + bind_port: Option, + bind_host: Option, + scheduler_host: Option, + scheduler_port: Option, + ) -> PyResult { + let mut config = ExecutorProcessConfig::default(); + if let Some(port) = bind_port { + config.port = port; + } + + if let Some(host) = bind_host { + config.bind_host = host; + } + + if let Some(port) = scheduler_port { + config.scheduler_port = port; + } + + if let Some(host) = scheduler_host { + config.scheduler_host = host; + } + + config.override_logical_codec = Some(Arc::new(PyLogicalCodec::try_new(py)?)); + config.override_physical_codec = Some(Arc::new(PyPhysicalCodec::try_new(py)?)); + + let config = Arc::new(config); + Ok(Self { + config, + handle: None, + }) + } + + pub fn start(&mut self, py: Python) -> PyResult<()> { + if self.handle.is_some() { + return Err(PyException::new_err("Executor already started")); + } + + let config = self.config.clone(); + + let handle = + spawn_feature( + py, + async move { start_executor_process(config).await.unwrap() }, + ); + self.handle = Some(handle); + + Ok(()) + } + + pub fn wait_for_termination(&mut self, py: Python) -> PyResult<()> { + if self.handle.is_none() { + return Err(PyException::new_err("Executor not started")); + } + let mut handle = None; + std::mem::swap(&mut self.handle, &mut handle); + + match handle { + Some(handle) => wait_for_future(py, handle.into_future()) + .map_err(|e| PyException::new_err(e.to_string())) + .map(|_| ()), + None => Ok(()), + } + } +} diff --git a/python/src/codec.rs b/python/src/codec.rs new file mode 100644 index 0000000000..2cf5b68187 --- /dev/null +++ b/python/src/codec.rs @@ -0,0 +1,225 @@ +use std::sync::Arc; + +use ballista_core::serde::{ + BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec, +}; +use datafusion::logical_expr::ScalarUDF; +use datafusion_proto::logical_plan::LogicalExtensionCodec; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use pyo3::types::{PyAnyMethods, PyBytes, PyBytesMethods}; +use pyo3::{PyObject, PyResult, Python}; + +static MODULE: &str = "cloudpickle"; +static FUN_LOADS: &str = "loads"; +static FUN_DUMPS: &str = "dumps"; + +#[derive(Debug)] +struct CloudPickle { + loads: PyObject, + dumps: PyObject, +} + +impl CloudPickle { + pub fn try_new(py: Python<'_>) -> PyResult { + let module = py.import_bound(MODULE)?; + let loads = module.getattr(FUN_LOADS)?.unbind(); + let dumps = module.getattr(FUN_DUMPS)?.unbind(); + + Ok(Self { loads, dumps }) + } + + pub fn pickle(&self, py: Python<'_>, py_any: &PyObject) -> PyResult> { + let b: PyObject = self.dumps.call1(py, (py_any,))?.extract(py)?; + let blob = b.downcast_bound::(py)?.clone(); + + Ok(blob.as_bytes().to_owned()) + } + + pub fn unpickle(&self, py: Python<'_>, blob: &[u8]) -> PyResult { + let t: PyObject = self.loads.call1(py, (blob,))?.extract(py)?; + + Ok(t) + } +} + +#[derive(Debug)] +pub struct PyLogicalCodec { + inner: BallistaLogicalExtensionCodec, + cloudpickle: CloudPickle, +} + +impl PyLogicalCodec { + pub fn try_new(py: Python<'_>) -> PyResult { + Ok(Self { + inner: BallistaLogicalExtensionCodec::default(), + cloudpickle: CloudPickle::try_new(py)?, + }) + } +} + +impl LogicalExtensionCodec for PyLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[datafusion::logical_expr::LogicalPlan], + ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result { + self.inner.try_decode(buf, inputs, ctx) + } + + fn try_encode( + &self, + node: &datafusion::logical_expr::Extension, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + self.inner.try_encode(node, buf) + } + + fn try_decode_table_provider( + &self, + buf: &[u8], + table_ref: &datafusion::sql::TableReference, + schema: datafusion::arrow::datatypes::SchemaRef, + ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result> + { + self.inner + .try_decode_table_provider(buf, table_ref, schema, ctx) + } + + fn try_encode_table_provider( + &self, + table_ref: &datafusion::sql::TableReference, + node: std::sync::Arc, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + self.inner.try_encode_table_provider(table_ref, node, buf) + } + + fn try_decode_file_format( + &self, + buf: &[u8], + ctx: &datafusion::prelude::SessionContext, + ) -> datafusion::error::Result< + std::sync::Arc, + > { + self.inner.try_decode_file_format(buf, ctx) + } + + fn try_encode_file_format( + &self, + buf: &mut Vec, + node: std::sync::Arc, + ) -> datafusion::error::Result<()> { + self.inner.try_encode_file_format(buf, node) + } + + fn try_decode_udf( + &self, + name: &str, + buf: &[u8], + ) -> datafusion::error::Result> + { + // use cloud pickle to decode udf + self.inner.try_decode_udf(name, buf) + } + + fn try_encode_udf( + &self, + node: &datafusion::logical_expr::ScalarUDF, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + // use cloud pickle to decode udf + self.inner.try_encode_udf(node, buf) + } + + fn try_decode_udaf( + &self, + name: &str, + buf: &[u8], + ) -> datafusion::error::Result> + { + self.inner.try_decode_udaf(name, buf) + } + + fn try_encode_udaf( + &self, + node: &datafusion::logical_expr::AggregateUDF, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + self.inner.try_encode_udaf(node, buf) + } + + fn try_decode_udwf( + &self, + name: &str, + buf: &[u8], + ) -> datafusion::error::Result> + { + self.inner.try_decode_udwf(name, buf) + } + + fn try_encode_udwf( + &self, + node: &datafusion::logical_expr::WindowUDF, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + self.inner.try_encode_udwf(node, buf) + } +} + +#[derive(Debug)] +pub struct PyPhysicalCodec { + inner: BallistaPhysicalExtensionCodec, + cloudpickle: CloudPickle, +} + +impl PyPhysicalCodec { + pub fn try_new(py: Python<'_>) -> PyResult { + Ok(Self { + inner: BallistaPhysicalExtensionCodec::default(), + cloudpickle: CloudPickle::try_new(py)?, + }) + } +} + +impl PhysicalExtensionCodec for PyPhysicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[std::sync::Arc], + registry: &dyn datafusion::execution::FunctionRegistry, + ) -> datafusion::error::Result< + std::sync::Arc, + > { + self.inner.try_decode(buf, inputs, registry) + } + + fn try_encode( + &self, + node: std::sync::Arc, + buf: &mut Vec, + ) -> datafusion::error::Result<()> { + self.inner.try_encode(node, buf) + } + + fn try_decode_udf( + &self, + name: &str, + _buf: &[u8], + ) -> datafusion::common::Result> { + // use cloudpickle here + datafusion::common::not_impl_err!( + "PhysicalExtensionCodec is not provided for scalar function {name}" + ) + } + + fn try_encode_udf( + &self, + _node: &ScalarUDF, + _buf: &mut Vec, + ) -> datafusion::common::Result<()> { + // use cloudpickle here + Ok(()) + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 41b4b6d31d..54c5fc063d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -16,16 +16,19 @@ // under the License. use ballista::prelude::*; +use cluster::{PyExecutor, PyScheduler}; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use datafusion_python::context::PySessionContext; use datafusion_python::utils::wait_for_future; - +use pyo3::prelude::*; use std::collections::HashMap; -use pyo3::prelude::*; +mod cluster; +mod codec; mod utils; -use utils::to_pyerr; + +pub(crate) struct TokioRuntime(tokio::runtime::Runtime); #[pymodule] fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { @@ -34,6 +37,9 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; // DataFusion struct m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) } diff --git a/python/src/utils.rs b/python/src/utils.rs index 10278537eb..f069475eaf 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -15,10 +15,48 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; +use std::sync::OnceLock; +use tokio::task::JoinHandle; + use ballista_core::error::BallistaError; use pyo3::exceptions::PyException; -use pyo3::PyErr; +use pyo3::{PyErr, Python}; +use tokio::runtime::Runtime; + +use crate::TokioRuntime; pub(crate) fn to_pyerr(err: BallistaError) -> PyErr { PyException::new_err(err.to_string()) } + +#[inline] +pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { + // NOTE: Other pyo3 python libraries have had issues with using tokio + // behind a forking app-server like `gunicorn` + // If we run into that problem, in the future we can look to `delta-rs` + // which adds a check in that disallows calls from a forked process + // https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31 + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap())) +} + +/// Utility to collect rust futures with GIL released +pub(crate) fn wait_for_future(py: Python, f: F) -> F::Output +where + F: Future + Send, + F::Output: Send, +{ + let runtime: &Runtime = &get_tokio_runtime().0; + py.allow_threads(|| runtime.block_on(f)) +} + +pub(crate) fn spawn_feature(py: Python, f: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send, +{ + let runtime: &Runtime = &get_tokio_runtime().0; + // do we need py.allow_threads ? + py.allow_threads(|| runtime.spawn(f)) +}