Skip to content

Commit

Permalink
fix builder api
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Dec 7, 2024
1 parent 1292c4a commit aadfd3e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 28 deletions.
7 changes: 7 additions & 0 deletions python/examples/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@
from datafusion.context import SessionContext

ctx: SessionContext = BallistaBuilder()\
.config("datafusion.catalog.information_schema","true")\
.config("ballista.job.name", "example ballista")\
.standalone()


ctx.sql("SELECT 1").show()

# %%
ctx.sql("SHOW TABLES").show()
# %%
ctx.sql("select name, value from information_schema.df_settings where name like 'ballista.job.name'").show()


# %%
39 changes: 11 additions & 28 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use crate::utils::wait_for_future;
use ballista::prelude::*;
use cluster::{PyExecutor, PyScheduler};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::*;
use datafusion_python::context::PySessionContext;
use datafusion_python::utils::wait_for_future;
use pyo3::prelude::*;
use std::collections::HashMap;

mod cluster;
#[allow(dead_code)]
Expand All @@ -34,77 +33,61 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
#[pymodule]
fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init();
// BallistaBuilder struct

m.add_class::<PyBallistaBuilder>()?;
// DataFusion struct
m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
m.add_class::<PyScheduler>()?;
m.add_class::<PyExecutor>()?;

Ok(())
}

// Ballista Builder will take a HasMap/Dict Cionfg
#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)]
pub struct PyBallistaBuilder {
conf: HashMap<String, String>,
session_config: SessionConfig,
}

#[pymethods]
impl PyBallistaBuilder {
#[new]
pub fn new() -> Self {
Self {
conf: HashMap::new(),
session_config: SessionConfig::new_with_ballista(),
}
}

pub fn config(
mut slf: PyRefMut<'_, Self>,
k: &str,
v: &str,
key: &str,
value: &str,
py: Python,
) -> PyResult<PyObject> {
slf.conf.insert(k.into(), v.into());
let _ = slf.session_config.options_mut().set(key, value);

Ok(slf.into_py(py))
}

/// Construct the standalone instance from the SessionContext
pub fn standalone(&self, py: Python) -> PyResult<PySessionContext> {
// Build the config
let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?;
// Build the state
let state = SessionStateBuilder::new()
.with_config(config)
.with_config(self.session_config.clone())
.with_default_features()
.build();
// Build the context
let standalone_session = SessionContext::standalone_with_state(state);

// SessionContext is an async function
let ctx = wait_for_future(py, standalone_session)?;
let ctx = wait_for_future(py, SessionContext::standalone_with_state(state))?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}

/// Construct the remote instance from the SessionContext
pub fn remote(&self, url: &str, py: Python) -> PyResult<PySessionContext> {
// Build the config
let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?;
// Build the state
let state = SessionStateBuilder::new()
.with_config(config)
.with_config(self.session_config.clone())
.with_default_features()
.build();
// Build the context
let remote_session = SessionContext::remote_with_state(url, state);

// SessionContext is an async function
let ctx = wait_for_future(py, remote_session)?;
let ctx = wait_for_future(py, SessionContext::remote_with_state(url, state))?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}
}

0 comments on commit aadfd3e

Please sign in to comment.