From 7debc5ed6949b6101df7d4fab081cc0d47b74c5b Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sun, 27 Oct 2024 18:50:23 -0700 Subject: [PATCH 01/33] added a pycontext to ballista --- python/Cargo.toml | 4 ++-- python/pyballista/context.py | 24 ++++++++++++++++++++++++ python/src/context.rs | 33 ++++++++++++++++++++++++++------- 3 files changed, 52 insertions(+), 9 deletions(-) create mode 100644 python/pyballista/context.py diff --git a/python/Cargo.toml b/python/Cargo.toml index dbe419d28..773135c02 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -31,9 +31,9 @@ publish = false [dependencies] async-trait = "0.1.77" -ballista = { path = "../ballista/client", version = "0.12.0" } +ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"]} ballista-core = { path = "../ballista/core", version = "0.12.0" } -datafusion = { version = "42" } +datafusion = { version = "42", features = ["pyarrow", "avro"] } datafusion-proto = { version = "42" } datafusion-python = { version = "42" } diff --git a/python/pyballista/context.py b/python/pyballista/context.py new file mode 100644 index 000000000..53a3a5290 --- /dev/null +++ b/python/pyballista/context.py @@ -0,0 +1,24 @@ +from contextlib import ContextDecorator +import json +import os +import time +from typing import Iterable + +import pyarrow as pa + +import pyballista +from pyballista import BallistaContext +from typing import List, Any +from datafusion import SessionContext + + +class BallistaContext: + def __init__(self, df_ctx: SessionContext): + self.df_ctx = df_ctx + self.ctx = BallistaContext(df_ctx) + + def register_csv(self, table_name: str, path: str, has_header: bool): + self.ctx.register_csv(table_name, path, has_header) + + def register_parquet(self, table_name: str, path: str): + self.ctx.register_parquet(table_name, path) \ No newline at end of file diff --git a/python/src/context.rs b/python/src/context.rs index eccdede9e..4033ac7f8 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -22,6 +22,7 @@ use pyo3::prelude::*; use std::path::PathBuf; use ballista::prelude::*; +use ballista::context::BallistaContext; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::prelude::*; @@ -41,22 +42,40 @@ use datafusion_python::utils::wait_for_future; /// SessionContext. We could probably add extra extension points to /// DataFusion to allow for a pluggable context and remove much of /// this code. -#[pyclass(name = "SessionContext", module = "pyballista", subclass)] +#[pyclass(name = "BallistaContext", module = "pyballista", subclass)] pub struct PySessionContext { ctx: BallistaContext, } +#[pyclass(name = "BallistaMode", module = "pyballista", eq, eq_int)] +#[derive(Clone, PartialEq)] +pub enum BallistaMode { + Standalone, + Remote, +} + #[pymethods] impl PySessionContext { /// Create a new SessionContext by connecting to a Ballista scheduler process. #[new] - pub fn new(host: &str, port: u16, py: Python) -> PyResult { - let config = BallistaConfig::new().unwrap(); - let ballista_context = BallistaContext::remote(host, port, &config); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) + #[pyo3(signature = (mode, concurrent_tasks, host=None, port=None))] + pub fn new(mode: BallistaMode, concurrent_tasks: usize, host: Option<&str>, port: Option, py: Python) -> PyResult { + match mode { + BallistaMode::Standalone => { + let config = BallistaConfig::new().unwrap(); + let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); + let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; + Ok(Self { ctx }) + } + BallistaMode::Remote => { + let config = BallistaConfig::new().unwrap(); + let ballista_context = BallistaContext::remote(host.unwrap(), port.unwrap(), &config); + let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; + Ok(Self { ctx }) + } + } } - + pub fn sql(&mut self, query: &str, py: Python) -> PyResult { let result = self.ctx.sql(query); let df = wait_for_future(py, result)?; From ace4009874ec0f5eb03f759a12009485ece72e3b Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sun, 27 Oct 2024 18:59:48 -0700 Subject: [PATCH 02/33] added a pycontext to ballista --- python/pyballista/context.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyballista/context.py b/python/pyballista/context.py index 53a3a5290..b2a515f10 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from contextlib import ContextDecorator import json import os From 9b45b2b296d245e2e52198b10e8eae8905bb34de Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sun, 27 Oct 2024 19:29:07 -0700 Subject: [PATCH 03/33] added a pycontext to ballista --- python/Cargo.toml | 2 +- python/src/context.rs | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 773135c02..c14e94757 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -31,7 +31,7 @@ publish = false [dependencies] async-trait = "0.1.77" -ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"]} +ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] } ballista-core = { path = "../ballista/core", version = "0.12.0" } datafusion = { version = "42", features = ["pyarrow", "avro"] } datafusion-proto = { version = "42" } diff --git a/python/src/context.rs b/python/src/context.rs index 4033ac7f8..7004ed6ff 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -21,8 +21,8 @@ use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use std::path::PathBuf; -use ballista::prelude::*; use ballista::context::BallistaContext; +use ballista::prelude::*; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::pyarrow::PyArrowType; use datafusion::prelude::*; @@ -59,23 +59,31 @@ impl PySessionContext { /// Create a new SessionContext by connecting to a Ballista scheduler process. #[new] #[pyo3(signature = (mode, concurrent_tasks, host=None, port=None))] - pub fn new(mode: BallistaMode, concurrent_tasks: usize, host: Option<&str>, port: Option, py: Python) -> PyResult { + pub fn new( + mode: BallistaMode, + concurrent_tasks: usize, + host: Option<&str>, + port: Option, + py: Python, + ) -> PyResult { match mode { BallistaMode::Standalone => { let config = BallistaConfig::new().unwrap(); - let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); + let ballista_context = + BallistaContext::standalone(&config, concurrent_tasks); let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; Ok(Self { ctx }) } BallistaMode::Remote => { let config = BallistaConfig::new().unwrap(); - let ballista_context = BallistaContext::remote(host.unwrap(), port.unwrap(), &config); + let ballista_context = + BallistaContext::remote(host.unwrap(), port.unwrap(), &config); let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; Ok(Self { ctx }) } } } - + pub fn sql(&mut self, query: &str, py: Python) -> PyResult { let result = self.ctx.sql(query); let df = wait_for_future(py, result)?; From 7961b459569fd084dcda19d44d378fbc759a9962 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Wed, 30 Oct 2024 18:48:51 -0700 Subject: [PATCH 04/33] updated python to have two static methods for creating a ballista context --- python/pyballista/__init__.py | 4 +-- python/pyballista/context.py | 10 ++++--- python/src/context.rs | 50 ++++++++++++----------------------- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/python/pyballista/__init__.py b/python/pyballista/__init__.py index 62a6bc790..da41aea23 100644 --- a/python/pyballista/__init__.py +++ b/python/pyballista/__init__.py @@ -26,11 +26,11 @@ import pyarrow as pa from .pyballista_internal import ( - SessionContext, + BallistaContext, ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "SessionContext", + "BallistaContext", ] diff --git a/python/pyballista/context.py b/python/pyballista/context.py index b2a515f10..15e7fa2a6 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -30,9 +30,13 @@ class BallistaContext: - def __init__(self, df_ctx: SessionContext): - self.df_ctx = df_ctx - self.ctx = BallistaContext(df_ctx) + def remote(self, host: str, port: int): + ballista = self.remote(host, port) + self.ctx = BallistaContext() + + def standalone(self, concurrent_tasks: int): + ballista = self.standalone(concurrent_tasks) + self.ctx = BallistaContext() def register_csv(self, table_name: str, path: str, has_header: bool): self.ctx.register_csv(table_name, path, has_header) diff --git a/python/src/context.rs b/python/src/context.rs index 7004ed6ff..a22b18849 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -47,41 +47,25 @@ pub struct PySessionContext { ctx: BallistaContext, } -#[pyclass(name = "BallistaMode", module = "pyballista", eq, eq_int)] -#[derive(Clone, PartialEq)] -pub enum BallistaMode { - Standalone, - Remote, -} - #[pymethods] impl PySessionContext { - /// Create a new SessionContext by connecting to a Ballista scheduler process. - #[new] - #[pyo3(signature = (mode, concurrent_tasks, host=None, port=None))] - pub fn new( - mode: BallistaMode, - concurrent_tasks: usize, - host: Option<&str>, - port: Option, - py: Python, - ) -> PyResult { - match mode { - BallistaMode::Standalone => { - let config = BallistaConfig::new().unwrap(); - let ballista_context = - BallistaContext::standalone(&config, concurrent_tasks); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) - } - BallistaMode::Remote => { - let config = BallistaConfig::new().unwrap(); - let ballista_context = - BallistaContext::remote(host.unwrap(), port.unwrap(), &config); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) - } - } + #[staticmethod] + #[pyo3(signature = (host="0.0.0.0", port=50500))] + pub fn remote(host: Option<&str>, port: Option, py: Python) -> PyResult { + let config = BallistaConfig::new().unwrap(); + let ballista_context = + BallistaContext::remote(host.unwrap(), port.unwrap(), &config); + let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; + Ok(Self { ctx }) + } + + #[staticmethod] + #[pyo3(signature = (concurrent_tasks=4))] + pub fn standalone(concurrent_tasks: usize, py: Python) -> PyResult { + let config = BallistaConfig::new().unwrap(); + let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); + let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; + Ok(Self { ctx }) } pub fn sql(&mut self, query: &str, py: Python) -> PyResult { From a387cc86f7077b03ffb541103904bcbbd1b7c0ba Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Thu, 31 Oct 2024 16:19:54 -0700 Subject: [PATCH 05/33] updated python to have two static methods for creating a ballista context --- examples/examples/remote-dataframe.rs | 7 ++---- python/examples/ctx.py | 4 ++++ python/pyballista/__init__.py | 2 +- python/pyballista/context.py | 31 ++++++++++++++------------- python/src/context.rs | 12 +++++++++++ 5 files changed, 35 insertions(+), 21 deletions(-) create mode 100644 python/examples/ctx.py diff --git a/examples/examples/remote-dataframe.rs b/examples/examples/remote-dataframe.rs index 6b190cea6..4df68a5c3 100644 --- a/examples/examples/remote-dataframe.rs +++ b/examples/examples/remote-dataframe.rs @@ -25,16 +25,13 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let ctx = BallistaContext::remote("10.103.0.25", 50050, &config).await?; let filename = "testdata/alltypes_plain.parquet"; // define the query using the DataFrame trait let df = ctx - .read_parquet(filename, ParquetReadOptions::default()) - .await? - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(1)))?; + .sql("SELECT 1").await?; // print the results df.show().await?; diff --git a/python/examples/ctx.py b/python/examples/ctx.py new file mode 100644 index 000000000..b6b345d83 --- /dev/null +++ b/python/examples/ctx.py @@ -0,0 +1,4 @@ +from pyballista.context import BallistaContext + +ballista = BallistaContext("remote", host="10.103.0.25").sql("SELECT 5434535") +print(ballista) \ No newline at end of file diff --git a/python/pyballista/__init__.py b/python/pyballista/__init__.py index da41aea23..7e4c9e7c4 100644 --- a/python/pyballista/__init__.py +++ b/python/pyballista/__init__.py @@ -33,4 +33,4 @@ __all__ = [ "BallistaContext", -] +] \ No newline at end of file diff --git a/python/pyballista/context.py b/python/pyballista/context.py index 15e7fa2a6..2f644c1cb 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -24,22 +24,23 @@ import pyarrow as pa import pyballista -from pyballista import BallistaContext -from typing import List, Any -from datafusion import SessionContext +from typing import List, Any class BallistaContext: - def remote(self, host: str, port: int): - ballista = self.remote(host, port) - self.ctx = BallistaContext() - - def standalone(self, concurrent_tasks: int): - ballista = self.standalone(concurrent_tasks) - self.ctx = BallistaContext() - - def register_csv(self, table_name: str, path: str, has_header: bool): - self.ctx.register_csv(table_name, path, has_header) + def __init__(self, mode: str = "standalone", host: str = "0.0.0.0", port: int = 50050, concurrent_tasks: int = 4): + if mode == "standalone": + self.ctx = pyballista.BallistaContext().standalone(concurrent_tasks) + else: + self.ctx = pyballista.BallistaContext().remote(host, port) + + def sql(self, sql: str) -> pa.RecordBatch: + # TODO we should parse sql and inspect the plan rather than + # perform a string comparison here + sql_str = sql.lower() + if "create view" in sql_str or "drop view" in sql_str: + self.ctx.sql(sql) + return [] - def register_parquet(self, table_name: str, path: str): - self.ctx.register_parquet(table_name, path) \ No newline at end of file + df = self.ctx.sql(sql) + return df \ No newline at end of file diff --git a/python/src/context.rs b/python/src/context.rs index a22b18849..c47211448 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -20,6 +20,7 @@ use datafusion::logical_expr::SortExpr; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use std::path::PathBuf; +use std::sync::Mutex; use ballista::context::BallistaContext; use ballista::prelude::*; @@ -36,6 +37,8 @@ use datafusion_python::expr::sort_expr::PySortExpr; use datafusion_python::sql::logical::PyLogicalPlan; use datafusion_python::utils::wait_for_future; +use ballista_core::utils::create_df_ctx_with_ballista_query_planner; + /// PyBallista session context. This is largely a duplicate of /// DataFusion's PySessionContext, with the main difference being /// that this operates on a BallistaContext instead of DataFusion's @@ -49,6 +52,15 @@ pub struct PySessionContext { #[pymethods] impl PySessionContext { + #[new] + #[pyo3(signature = (concurrent_tasks=4))] + pub fn new(concurrent_tasks: usize, py: Python) -> PyResult { + let config = BallistaConfig::new().unwrap(); + let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); + let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; + Ok(Self { ctx }) + } + #[staticmethod] #[pyo3(signature = (host="0.0.0.0", port=50500))] pub fn remote(host: Option<&str>, port: Option, py: Python) -> PyResult { From 3cae1b8c333bdb90d4290ff465fa655a48bf34aa Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 09:43:40 -0700 Subject: [PATCH 06/33] updated python to have two static methods for creating a ballista context --- examples/examples/remote-dataframe.rs | 4 +- python/examples/ctx.py | 4 - python/examples/example.py | 28 ++ python/pyballista/__init__.py | 4 +- python/pyballista/context.py | 154 ++++++++++- python/src/context.rs | 371 +++----------------------- python/testdata/test.csv | 0 7 files changed, 209 insertions(+), 356 deletions(-) delete mode 100644 python/examples/ctx.py create mode 100644 python/examples/example.py mode change 100644 => 100755 python/testdata/test.csv diff --git a/examples/examples/remote-dataframe.rs b/examples/examples/remote-dataframe.rs index 4df68a5c3..1acdd9141 100644 --- a/examples/examples/remote-dataframe.rs +++ b/examples/examples/remote-dataframe.rs @@ -16,7 +16,6 @@ // under the License. use ballista::prelude::*; -use datafusion::prelude::{col, lit, ParquetReadOptions}; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait @@ -30,8 +29,7 @@ async fn main() -> Result<()> { let filename = "testdata/alltypes_plain.parquet"; // define the query using the DataFrame trait - let df = ctx - .sql("SELECT 1").await?; + let df = ctx.sql("SELECT 1").await?; // print the results df.show().await?; diff --git a/python/examples/ctx.py b/python/examples/ctx.py deleted file mode 100644 index b6b345d83..000000000 --- a/python/examples/ctx.py +++ /dev/null @@ -1,4 +0,0 @@ -from pyballista.context import BallistaContext - -ballista = BallistaContext("remote", host="10.103.0.25").sql("SELECT 5434535") -print(ballista) \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py new file mode 100644 index 000000000..86824f1d4 --- /dev/null +++ b/python/examples/example.py @@ -0,0 +1,28 @@ +from datafusion import col +from pyballista.context import BallistaContext +import pyarrow as pa + +import csv +from datafusion.object_store import AmazonS3 +from datafusion.context import SessionContext +import os + +# Create the Ballista Context [standalone or remote] +ctx = BallistaContext().standalone() + +# Register our parquet file to perform SQL operations +ctx.register_parquet("test_parquet", "./testdata/test.parquet") + +# Select the data from our test parquet file +test_parquet = ctx.sql(""" + SELECT * FROM test_parquet +""") + +# Show our test parquet data +print(test_parquet.show()) + +# To perform daatframe operations, read in data +test_csv = ctx.read_csv("./testdata/test.csv", has_header=False) + +# Show the dataframe +test_csv.show() \ No newline at end of file diff --git a/python/pyballista/__init__.py b/python/pyballista/__init__.py index 7e4c9e7c4..52ebcbe85 100644 --- a/python/pyballista/__init__.py +++ b/python/pyballista/__init__.py @@ -26,11 +26,11 @@ import pyarrow as pa from .pyballista_internal import ( - BallistaContext, + SessionContext, ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "BallistaContext", + "SessionContext", ] \ No newline at end of file diff --git a/python/pyballista/context.py b/python/pyballista/context.py index 2f644c1cb..a8620673d 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -20,19 +20,28 @@ import os import time from typing import Iterable +import pathlib +from datafusion import SessionContext import pyarrow as pa - import pyballista from typing import List, Any class BallistaContext: - def __init__(self, mode: str = "standalone", host: str = "0.0.0.0", port: int = 50050, concurrent_tasks: int = 4): - if mode == "standalone": - self.ctx = pyballista.BallistaContext().standalone(concurrent_tasks) - else: - self.ctx = pyballista.BallistaContext().remote(host, port) + def __init__(self, df_ctx: SessionContext = SessionContext()): + self.df_ctx = df_ctx + self.ctx = pyballista.SessionContext(df_ctx) + + def standalone(self) -> SessionContext: + ctx = self.ctx.local() + + return ctx + + def remote(self, url: str) -> SessionContext: + ctx = self.ctx.remote(url) + + return ctx def sql(self, sql: str) -> pa.RecordBatch: # TODO we should parse sql and inspect the plan rather than @@ -43,4 +52,135 @@ def sql(self, sql: str) -> pa.RecordBatch: return [] df = self.ctx.sql(sql) - return df \ No newline at end of file + return df + + def read_csv( + self, + path: str | pathlib.Path | list[str] | list[pathlib.Path], + schema: pa.Schema = None, + has_header: bool = False, + delimeter: str = ",", + schema_infer_max_records: int = 1000, + file_extension: str = ".csv", + table_partition_cols: list[tuple[str, str]] | None = None, + file_compression_type: str | None = None + ): + return self.ctx.read_csv( + path, + schema, + has_header, + delimeter, + schema_infer_max_records, + file_extension, + table_partition_cols, + file_compression_type + ) + + def register_csv( + self, + name: str, + path: str | pathlib.Path | list[str] | list[pathlib.Path], + schema: pa.Schema | None = None, + has_header: bool = False, + delimeter: str = ",", + schema_infer_max_records: int = 1000, + file_extension: str = ".csv", + file_compression_type: str | None = None + ): + return self.ctx.register_csv( + name, + path, + schema, + has_header, + delimeter, + schema_infer_max_records, + file_extension, + file_compression_type + ) + + def read_parquet( + self, + path: str | pathlib.Path, + table_partition_cols: list[tuple[str, str]] | None = None, + parquet_pruning: bool = True, + file_extension: str = ".parquet", + skip_metadata: bool = True, + schema: pa.Schema | None = None, + file_sort_order: list[list[str]] | None = None, + ): + """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`. + + Args: + path: Path to the Parquet file. + table_partition_cols: Partition columns. + parquet_pruning: Whether the parquet reader should use the predicate + to prune row groups. + file_extension: File extension; only files with this extension are + selected for data input. + skip_metadata: Whether the parquet reader should skip any metadata + that may be in the file schema. This can help avoid schema + conflicts due to metadata. + schema: An optional schema representing the parquet files. If None, + the parquet reader will try to infer it based on data in the + file. + file_sort_order: Sort order for the file. + + Returns: + DataFrame representation of the read Parquet files + """ + if table_partition_cols is None: + table_partition_cols = [] + return self.ctx.read_parquet( + str(path), + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + schema, + file_sort_order, + ) + + def register_parquet( + self, + name: str, + path: str | pathlib.Path, + table_partition_cols: list[tuple[str, str]] | None = None, + parquet_pruning: bool = True, + file_extension: str = ".parquet", + skip_metadata: bool = True, + schema: pa.Schema | None = None, + file_sort_order: list[list[str]] | None = None, + ) -> None: + """Register a Parquet file as a table. + + The registered table can be referenced from SQL statement executed + against this context. + + Args: + name: Name of the table to register. + path: Path to the Parquet file. + table_partition_cols: Partition columns. + parquet_pruning: Whether the parquet reader should use the + predicate to prune row groups. + file_extension: File extension; only files with this extension are + selected for data input. + skip_metadata: Whether the parquet reader should skip any metadata + that may be in the file schema. This can help avoid schema + conflicts due to metadata. + schema: The data source schema. + file_sort_order: Sort order for the file. + """ + if table_partition_cols is None: + table_partition_cols = [] + self.ctx.register_parquet( + name, + str(path), + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + schema, + file_sort_order, + ) + + \ No newline at end of file diff --git a/python/src/context.rs b/python/src/context.rs index c47211448..18defe5de 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -15,362 +15,53 @@ // specific language governing permissions and limitations // under the License. -use crate::utils::to_pyerr; -use datafusion::logical_expr::SortExpr; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use std::path::PathBuf; -use std::sync::Mutex; -use ballista::context::BallistaContext; use ballista::prelude::*; -use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::pyarrow::PyArrowType; use datafusion::prelude::*; -use datafusion_python::catalog::PyTable; -use datafusion_python::context::{ - convert_table_partition_cols, parse_file_compression_type, -}; -use datafusion_python::dataframe::PyDataFrame; -use datafusion_python::errors::DataFusionError; -use datafusion_python::expr::sort_expr::PySortExpr; -use datafusion_python::sql::logical::PyLogicalPlan; +use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; -use ballista_core::utils::create_df_ctx_with_ballista_query_planner; - -/// PyBallista session context. This is largely a duplicate of -/// DataFusion's PySessionContext, with the main difference being -/// that this operates on a BallistaContext instead of DataFusion's -/// SessionContext. We could probably add extra extension points to -/// DataFusion to allow for a pluggable context and remove much of -/// this code. -#[pyclass(name = "BallistaContext", module = "pyballista", subclass)] +/// PyBallista session context. This SessionContext will +/// inherit from a Python Object, specifically the DataFusion +/// Python SessionContext. This will allow us to only need to +/// define whether the Context is operating locally or remotely +/// and all the functions for the SessionContext can be defined +/// in the Python Class. +#[pyclass(name = "SessionContext", module = "pyballista", subclass)] pub struct PySessionContext { - ctx: BallistaContext, + /// Inherit the Datafusion Python Session Context from your + /// Python Runtime + pub(crate) py_ctx: PyObject, } +/// We only need to provide the cluster type to the SessionContext +/// since all of the methods will inherit form the DataFusion +/// Python library #[pymethods] impl PySessionContext { + /// Provide the Python DataFusion SessionContext to + /// the PySessionContext struct #[new] - #[pyo3(signature = (concurrent_tasks=4))] - pub fn new(concurrent_tasks: usize, py: Python) -> PyResult { - let config = BallistaConfig::new().unwrap(); - let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) - } - - #[staticmethod] - #[pyo3(signature = (host="0.0.0.0", port=50500))] - pub fn remote(host: Option<&str>, port: Option, py: Python) -> PyResult { - let config = BallistaConfig::new().unwrap(); - let ballista_context = - BallistaContext::remote(host.unwrap(), port.unwrap(), &config); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) + pub fn new(session_ctx: PyObject) -> PyResult { + Ok(Self { + py_ctx: session_ctx, + }) } + /// Provide Context for local execution #[staticmethod] - #[pyo3(signature = (concurrent_tasks=4))] - pub fn standalone(concurrent_tasks: usize, py: Python) -> PyResult { - let config = BallistaConfig::new().unwrap(); - let ballista_context = BallistaContext::standalone(&config, concurrent_tasks); - let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?; - Ok(Self { ctx }) - } - - pub fn sql(&mut self, query: &str, py: Python) -> PyResult { - let result = self.ctx.sql(query); - let df = wait_for_future(py, result)?; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))] - pub fn read_avro( - &self, - path: &str, - schema: Option>, - table_partition_cols: Vec<(String, String)>, - file_extension: &str, - py: Python, - ) -> PyResult { - let mut options = AvroReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?); - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future).map_err(DataFusionError::from)? - } else { - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future).map_err(DataFusionError::from)? - }; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = ( - path, - schema=None, - has_header=true, - delimiter=",", - schema_infer_max_records=1000, - file_extension=".csv", - table_partition_cols=vec![], - file_compression_type=None))] - pub fn read_csv( - &self, - path: PathBuf, - schema: Option>, - has_header: bool, - delimiter: &str, - schema_infer_max_records: usize, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - file_compression_type: Option, - py: Python, - ) -> PyResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let delimiter = delimiter.as_bytes(); - if delimiter.len() != 1 { - return Err(PyValueError::new_err( - "Delimiter must be a single character", - )); - }; - - let mut options = CsvReadOptions::new() - .has_header(has_header) - .delimiter(delimiter[0]) - .schema_infer_max_records(schema_infer_max_records) - .file_extension(file_extension) - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - - if let Some(py_schema) = schema { - options.schema = Some(&py_schema.0); - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)?); - Ok(df) - } else { - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)?); - Ok(df) - } - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))] - pub fn read_json( - &mut self, - path: PathBuf, - schema: Option>, - schema_infer_max_records: usize, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - file_compression_type: Option, - py: Python, - ) -> PyResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let mut options = NdJsonReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let result = self.ctx.read_json(path, options); - wait_for_future(py, result).map_err(DataFusionError::from)? - } else { - let result = self.ctx.read_json(path, options); - wait_for_future(py, result).map_err(DataFusionError::from)? - }; - Ok(PyDataFrame::new(df)) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = ( - path, - table_partition_cols=vec![], - parquet_pruning=true, - file_extension=".parquet", - skip_metadata=true, - schema=None, - file_sort_order=None))] - pub fn read_parquet( - &self, - path: &str, - table_partition_cols: Vec<(String, String)>, - parquet_pruning: bool, - file_extension: &str, - skip_metadata: bool, - schema: Option>, - file_sort_order: Option>>, - py: Python, - ) -> PyResult { - let mut options = ParquetReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| { - e.into_iter() - .map(|f| { - let sort_expr: SortExpr = f.into(); - sort_expr - }) - .collect() - }) - .collect(); - - let result = self.ctx.read_parquet(path, options); - let df = - PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?); - Ok(df) + pub fn local(py: Python) -> PyResult { + let session_context = SessionContext::standalone(); + let ctx = wait_for_future(py, session_context)?; + Ok(ctx.into()) } - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, - path, - schema=None, - file_extension=".avro", - table_partition_cols=vec![]))] - pub fn register_avro( - &mut self, - name: &str, - path: PathBuf, - schema: Option>, - file_extension: &str, - table_partition_cols: Vec<(String, String)>, - py: Python, - ) -> PyResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = AvroReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_avro(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, - path, - schema=None, - has_header=true, - delimiter=",", - schema_infer_max_records=1000, - file_extension=".csv", - file_compression_type=None))] - pub fn register_csv( - &mut self, - name: &str, - path: PathBuf, - schema: Option>, - has_header: bool, - delimiter: &str, - schema_infer_max_records: usize, - file_extension: &str, - file_compression_type: Option, - py: Python, - ) -> PyResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let delimiter = delimiter.as_bytes(); - if delimiter.len() != 1 { - return Err(PyValueError::new_err( - "Delimiter must be a single character", - )); - } - - let mut options = CsvReadOptions::new() - .has_header(has_header) - .delimiter(delimiter[0]) - .schema_infer_max_records(schema_infer_max_records) - .file_extension(file_extension) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_csv(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (name, path, table_partition_cols=vec![], - parquet_pruning=true, - file_extension=".parquet", - skip_metadata=true, - schema=None, - file_sort_order=None))] - pub fn register_parquet( - &mut self, - name: &str, - path: &str, - table_partition_cols: Vec<(String, String)>, - parquet_pruning: bool, - file_extension: &str, - skip_metadata: bool, - schema: Option>, - file_sort_order: Option>>, - py: Python, - ) -> PyResult<()> { - let mut options = ParquetReadOptions::default() - .table_partition_cols(convert_table_partition_cols(table_partition_cols)?) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| { - e.into_iter() - .map(|f| { - let sort_expr: SortExpr = f.into(); - sort_expr - }) - .collect() - }) - .collect(); - - let result = self.ctx.register_parquet(name, path, options); - wait_for_future(py, result).map_err(DataFusionError::from)?; - Ok(()) - } - - pub fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> { - self.ctx - .register_table(name, table.table()) - .map_err(DataFusionError::from)?; - Ok(()) - } - - pub fn execute_logical_plan( - &mut self, - logical_plan: PyLogicalPlan, - py: Python, - ) -> PyResult { - let result = self.ctx.execute_logical_plan(logical_plan.into()); - let df = wait_for_future(py, result).unwrap(); - Ok(PyDataFrame::new(df)) + /// Provide Context for remote execution + #[staticmethod] + pub fn remote(py: Python, url: &str) -> PyResult { + let session_context = SessionContext::remote(url); + let ctx = wait_for_future(py, session_context)?; + Ok(ctx.into()) } } diff --git a/python/testdata/test.csv b/python/testdata/test.csv old mode 100644 new mode 100755 From 0ecd91fbf9aa782042885b2b7a1fef9adc4d5071 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 09:45:41 -0700 Subject: [PATCH 07/33] updated python to have two static methods for creating a ballista context --- examples/examples/remote-dataframe.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/examples/examples/remote-dataframe.rs b/examples/examples/remote-dataframe.rs index 1acdd9141..6b190cea6 100644 --- a/examples/examples/remote-dataframe.rs +++ b/examples/examples/remote-dataframe.rs @@ -16,6 +16,7 @@ // under the License. use ballista::prelude::*; +use datafusion::prelude::{col, lit, ParquetReadOptions}; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results, using the DataFrame trait @@ -24,12 +25,16 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("10.103.0.25", 50050, &config).await?; + let ctx = BallistaContext::remote("localhost", 50050, &config).await?; let filename = "testdata/alltypes_plain.parquet"; // define the query using the DataFrame trait - let df = ctx.sql("SELECT 1").await?; + let df = ctx + .read_parquet(filename, ParquetReadOptions::default()) + .await? + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(1)))?; // print the results df.show().await?; From 0e0a60ffcb40c41fd76d4e6d91fb7d816a916f58 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 09:47:21 -0700 Subject: [PATCH 08/33] updated python to have two static methods for creating a ballista context --- python/examples/example.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/examples/example.py b/python/examples/example.py index 86824f1d4..629bf6e3f 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from datafusion import col from pyballista.context import BallistaContext import pyarrow as pa From 0028c46963e33579ccf89d3a31cf8fd4cf136618 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 10:05:09 -0700 Subject: [PATCH 09/33] updated python to have two static methods for creating a ballista context --- docs/source/user-guide/python.md | 13 ++++++++----- python/examples/example.py | 7 ------- python/pyballista/context.py | 3 +++ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 674850c70..7048fe4e0 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -28,9 +28,11 @@ popular file formats files, run it in a distributed environment, and obtain the The following code demonstrates how to create a Ballista context and connect to a scheduler. +If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `BallistaContext.remote("http://my-remote-ip:50050")`. + ```text ->>> import ballista ->>> ctx = ballista.BallistaContext("localhost", 50050) +>>> from pyballista.context import BallistaContext +>>> ctx = BallistaContext().standalone() ``` ## SQL @@ -103,14 +105,15 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -import ballista +import pyballista import pyarrow # an alias -f = ballista.functions +# TODO implement Functions +f = pyballista.functions # create a context -ctx = ballista.BallistaContext("localhost", 50050) +ctx = pyballista.SessionContext().standalone() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( diff --git a/python/examples/example.py b/python/examples/example.py index 629bf6e3f..e6a4746ad 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,14 +15,7 @@ # specific language governing permissions and limitations # under the License. -from datafusion import col from pyballista.context import BallistaContext -import pyarrow as pa - -import csv -from datafusion.object_store import AmazonS3 -from datafusion.context import SessionContext -import os # Create the Ballista Context [standalone or remote] ctx = BallistaContext().standalone() diff --git a/python/pyballista/context.py b/python/pyballista/context.py index a8620673d..ad860aee7 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -182,5 +182,8 @@ def register_parquet( schema, file_sort_order, ) + + def functions(self): + self.ctx.functions \ No newline at end of file From 7726d3ebe6c26bf3f0c840fba20965358372f95a Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 14:01:52 -0700 Subject: [PATCH 10/33] updated python to have two static methods for creating a ballista context --- python/examples/example.py | 9 +- python/pyballista/__init__.py | 6 +- python/pyballista/context.py | 174 ++-------------------------------- python/src/context.rs | 67 ------------- python/src/lib.rs | 37 +++++++- 5 files changed, 51 insertions(+), 242 deletions(-) delete mode 100644 python/src/context.rs diff --git a/python/examples/example.py b/python/examples/example.py index e6a4746ad..933de97f7 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,18 +15,17 @@ # specific language governing permissions and limitations # under the License. -from pyballista.context import BallistaContext +from pyballista import StandaloneBallista, RemoteBallista +from datafusion.context import SessionContext # Create the Ballista Context [standalone or remote] -ctx = BallistaContext().standalone() +ctx: SessionContext = StandaloneBallista.build() # Register our parquet file to perform SQL operations ctx.register_parquet("test_parquet", "./testdata/test.parquet") # Select the data from our test parquet file -test_parquet = ctx.sql(""" - SELECT * FROM test_parquet -""") +test_parquet = ctx.sql("SELECT * FROM test_parquet") # Show our test parquet data print(test_parquet.show()) diff --git a/python/pyballista/__init__.py b/python/pyballista/__init__.py index 52ebcbe85..1e259566b 100644 --- a/python/pyballista/__init__.py +++ b/python/pyballista/__init__.py @@ -26,11 +26,13 @@ import pyarrow as pa from .pyballista_internal import ( - SessionContext, + StandaloneBallista, + RemoteBallista ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "SessionContext", + "StandaloneBallista", + "RemoteBallista" ] \ No newline at end of file diff --git a/python/pyballista/context.py b/python/pyballista/context.py index ad860aee7..e70bac5ec 100644 --- a/python/pyballista/context.py +++ b/python/pyballista/context.py @@ -15,175 +15,21 @@ # specific language governing permissions and limitations # under the License. -from contextlib import ContextDecorator -import json -import os -import time -from typing import Iterable -import pathlib - from datafusion import SessionContext -import pyarrow as pa import pyballista from typing import List, Any -class BallistaContext: - def __init__(self, df_ctx: SessionContext = SessionContext()): - self.df_ctx = df_ctx - self.ctx = pyballista.SessionContext(df_ctx) - - def standalone(self) -> SessionContext: - ctx = self.ctx.local() - - return ctx - - def remote(self, url: str) -> SessionContext: - ctx = self.ctx.remote(url) - - return ctx - - def sql(self, sql: str) -> pa.RecordBatch: - # TODO we should parse sql and inspect the plan rather than - # perform a string comparison here - sql_str = sql.lower() - if "create view" in sql_str or "drop view" in sql_str: - self.ctx.sql(sql) - return [] - - df = self.ctx.sql(sql) - return df +class StandaloneBallista: + def __init__(self): + self.ctx = pyballista.StandaloneBallista - def read_csv( - self, - path: str | pathlib.Path | list[str] | list[pathlib.Path], - schema: pa.Schema = None, - has_header: bool = False, - delimeter: str = ",", - schema_infer_max_records: int = 1000, - file_extension: str = ".csv", - table_partition_cols: list[tuple[str, str]] | None = None, - file_compression_type: str | None = None - ): - return self.ctx.read_csv( - path, - schema, - has_header, - delimeter, - schema_infer_max_records, - file_extension, - table_partition_cols, - file_compression_type - ) - - def register_csv( - self, - name: str, - path: str | pathlib.Path | list[str] | list[pathlib.Path], - schema: pa.Schema | None = None, - has_header: bool = False, - delimeter: str = ",", - schema_infer_max_records: int = 1000, - file_extension: str = ".csv", - file_compression_type: str | None = None - ): - return self.ctx.register_csv( - name, - path, - schema, - has_header, - delimeter, - schema_infer_max_records, - file_extension, - file_compression_type - ) - - def read_parquet( - self, - path: str | pathlib.Path, - table_partition_cols: list[tuple[str, str]] | None = None, - parquet_pruning: bool = True, - file_extension: str = ".parquet", - skip_metadata: bool = True, - schema: pa.Schema | None = None, - file_sort_order: list[list[str]] | None = None, - ): - """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`. - - Args: - path: Path to the Parquet file. - table_partition_cols: Partition columns. - parquet_pruning: Whether the parquet reader should use the predicate - to prune row groups. - file_extension: File extension; only files with this extension are - selected for data input. - skip_metadata: Whether the parquet reader should skip any metadata - that may be in the file schema. This can help avoid schema - conflicts due to metadata. - schema: An optional schema representing the parquet files. If None, - the parquet reader will try to infer it based on data in the - file. - file_sort_order: Sort order for the file. - - Returns: - DataFrame representation of the read Parquet files - """ - if table_partition_cols is None: - table_partition_cols = [] - return self.ctx.read_parquet( - str(path), - table_partition_cols, - parquet_pruning, - file_extension, - skip_metadata, - schema, - file_sort_order, - ) - - def register_parquet( - self, - name: str, - path: str | pathlib.Path, - table_partition_cols: list[tuple[str, str]] | None = None, - parquet_pruning: bool = True, - file_extension: str = ".parquet", - skip_metadata: bool = True, - schema: pa.Schema | None = None, - file_sort_order: list[list[str]] | None = None, - ) -> None: - """Register a Parquet file as a table. - - The registered table can be referenced from SQL statement executed - against this context. - - Args: - name: Name of the table to register. - path: Path to the Parquet file. - table_partition_cols: Partition columns. - parquet_pruning: Whether the parquet reader should use the - predicate to prune row groups. - file_extension: File extension; only files with this extension are - selected for data input. - skip_metadata: Whether the parquet reader should skip any metadata - that may be in the file schema. This can help avoid schema - conflicts due to metadata. - schema: The data source schema. - file_sort_order: Sort order for the file. - """ - if table_partition_cols is None: - table_partition_cols = [] - self.ctx.register_parquet( - name, - str(path), - table_partition_cols, - parquet_pruning, - file_extension, - skip_metadata, - schema, - file_sort_order, - ) + def build(self): + return self.ctx.build() - def functions(self): - self.ctx.functions +class RemoteBallista: + def __init__(self): + self.ctx = pyballista.RemoteBallista - \ No newline at end of file + def build(self, url: str): + return self.ctx.build(url) \ No newline at end of file diff --git a/python/src/context.rs b/python/src/context.rs deleted file mode 100644 index 18defe5de..000000000 --- a/python/src/context.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use pyo3::prelude::*; - -use ballista::prelude::*; -use datafusion::prelude::*; -use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; -use datafusion_python::utils::wait_for_future; - -/// PyBallista session context. This SessionContext will -/// inherit from a Python Object, specifically the DataFusion -/// Python SessionContext. This will allow us to only need to -/// define whether the Context is operating locally or remotely -/// and all the functions for the SessionContext can be defined -/// in the Python Class. -#[pyclass(name = "SessionContext", module = "pyballista", subclass)] -pub struct PySessionContext { - /// Inherit the Datafusion Python Session Context from your - /// Python Runtime - pub(crate) py_ctx: PyObject, -} - -/// We only need to provide the cluster type to the SessionContext -/// since all of the methods will inherit form the DataFusion -/// Python library -#[pymethods] -impl PySessionContext { - /// Provide the Python DataFusion SessionContext to - /// the PySessionContext struct - #[new] - pub fn new(session_ctx: PyObject) -> PyResult { - Ok(Self { - py_ctx: session_ctx, - }) - } - - /// Provide Context for local execution - #[staticmethod] - pub fn local(py: Python) -> PyResult { - let session_context = SessionContext::standalone(); - let ctx = wait_for_future(py, session_context)?; - Ok(ctx.into()) - } - - /// Provide Context for remote execution - #[staticmethod] - pub fn remote(py: Python, url: &str) -> PyResult { - let session_context = SessionContext::remote(url); - let ctx = wait_for_future(py, session_context)?; - Ok(ctx.into()) - } -} diff --git a/python/src/lib.rs b/python/src/lib.rs index 5fbd2491b..0a2af5b7e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,18 +15,47 @@ // specific language governing permissions and limitations // under the License. +use ballista::prelude::*; +use datafusion::prelude::*; +use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; +use datafusion_python::utils::wait_for_future; use pyo3::prelude::*; -pub mod context; mod utils; -pub use crate::context::PySessionContext; - #[pymodule] fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; // DataFusion structs m.add_class::()?; Ok(()) } + +#[pyclass(name = "StandaloneBallista", module = "pyballista", subclass)] +pub struct PyStandaloneBallista; + +#[pymethods] +impl PyStandaloneBallista { + #[staticmethod] + pub fn build(py: Python) -> PyResult { + let session_context = SessionContext::standalone(); + let ctx = wait_for_future(py, session_context)?; + Ok(ctx.into()) + } +} + +#[pyclass(name = "RemoteBallista", module = "pyballista", subclass)] +pub struct PyRemoteBallista; + +#[pymethods] +impl PyRemoteBallista { + #[staticmethod] + pub fn build(url: &str, py: Python) -> PyResult { + let session_context = SessionContext::remote(url); + let ctx = wait_for_future(py, session_context)?; + + Ok(ctx.into()) + } +} From 75d02081561560369e5d5161304d2d24ea0ff054 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 1 Nov 2024 18:44:21 -0700 Subject: [PATCH 11/33] updated python to have two static methods for creating a ballista context --- docs/source/user-guide/python.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 7048fe4e0..154ec54f0 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -31,8 +31,12 @@ The following code demonstrates how to create a Ballista context and connect to If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `BallistaContext.remote("http://my-remote-ip:50050")`. ```text ->>> from pyballista.context import BallistaContext ->>> ctx = BallistaContext().standalone() +>>> from pyballista import StandaloneBallista, RemoteBallista +>>> # for a standalone instance +>>> ctx = StandaloneBallista.build() +>>> +>>> # for a remote instance provide the URL +>>> ctx = RemoteBallista.build("http://url-path-to-scheduler:50050") ``` ## SQL @@ -105,7 +109,7 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -import pyballista +from pyballista import StandaloneBallista import pyarrow # an alias @@ -113,7 +117,7 @@ import pyarrow f = pyballista.functions # create a context -ctx = pyballista.SessionContext().standalone() +ctx = StandaloneBallista.build() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( From f8246ddf85a8d3064638f285a2dd5d738d635c46 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sat, 2 Nov 2024 16:38:46 -0700 Subject: [PATCH 12/33] updating the pyballista package to ballista --- python/{pyballista => ballista}/__init__.py | 2 +- python/{pyballista => ballista}/context.py | 6 +++--- python/{pyballista => ballista}/tests/__init__.py | 0 python/{pyballista => ballista}/tests/test_context.py | 2 +- python/examples/example.py | 2 +- python/src/lib.rs | 6 +++--- 6 files changed, 9 insertions(+), 9 deletions(-) rename python/{pyballista => ballista}/__init__.py (97%) rename python/{pyballista => ballista}/context.py (90%) rename python/{pyballista => ballista}/tests/__init__.py (100%) rename python/{pyballista => ballista}/tests/test_context.py (98%) diff --git a/python/pyballista/__init__.py b/python/ballista/__init__.py similarity index 97% rename from python/pyballista/__init__.py rename to python/ballista/__init__.py index 1e259566b..4d469c3fb 100644 --- a/python/pyballista/__init__.py +++ b/python/ballista/__init__.py @@ -25,7 +25,7 @@ import pyarrow as pa -from .pyballista_internal import ( +from .ballista_internal import ( StandaloneBallista, RemoteBallista ) diff --git a/python/pyballista/context.py b/python/ballista/context.py similarity index 90% rename from python/pyballista/context.py rename to python/ballista/context.py index e70bac5ec..f383cd07d 100644 --- a/python/pyballista/context.py +++ b/python/ballista/context.py @@ -16,20 +16,20 @@ # under the License. from datafusion import SessionContext -import pyballista +import ballista from typing import List, Any class StandaloneBallista: def __init__(self): - self.ctx = pyballista.StandaloneBallista + self.ctx = ballista.StandaloneBallista def build(self): return self.ctx.build() class RemoteBallista: def __init__(self): - self.ctx = pyballista.RemoteBallista + self.ctx = ballista.RemoteBallista def build(self, url: str): return self.ctx.build(url) \ No newline at end of file diff --git a/python/pyballista/tests/__init__.py b/python/ballista/tests/__init__.py similarity index 100% rename from python/pyballista/tests/__init__.py rename to python/ballista/tests/__init__.py diff --git a/python/pyballista/tests/test_context.py b/python/ballista/tests/test_context.py similarity index 98% rename from python/pyballista/tests/test_context.py rename to python/ballista/tests/test_context.py index b440bb270..97d90e42b 100644 --- a/python/pyballista/tests/test_context.py +++ b/python/ballista/tests/test_context.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from pyballista import SessionContext +from ballista import SessionContext import pytest def test_create_context(): diff --git a/python/examples/example.py b/python/examples/example.py index 933de97f7..3e554324b 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from pyballista import StandaloneBallista, RemoteBallista +from ballista import StandaloneBallista, RemoteBallista from datafusion.context import SessionContext # Create the Ballista Context [standalone or remote] diff --git a/python/src/lib.rs b/python/src/lib.rs index 0a2af5b7e..4aaea15d5 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -23,7 +23,7 @@ use pyo3::prelude::*; mod utils; #[pymodule] -fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { +fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs m.add_class::()?; @@ -33,7 +33,7 @@ fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } -#[pyclass(name = "StandaloneBallista", module = "pyballista", subclass)] +#[pyclass(name = "StandaloneBallista", module = "ballista", subclass)] pub struct PyStandaloneBallista; #[pymethods] @@ -46,7 +46,7 @@ impl PyStandaloneBallista { } } -#[pyclass(name = "RemoteBallista", module = "pyballista", subclass)] +#[pyclass(name = "RemoteBallista", module = "ballista", subclass)] pub struct PyRemoteBallista; #[pymethods] From bef917058ca672ec929a07257d03d7d5996dc0f2 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sat, 2 Nov 2024 17:33:42 -0700 Subject: [PATCH 13/33] changing the packagaing naming convention from pyballista to ballista --- python/Cargo.toml | 6 ++---- python/README.md | 2 +- python/pyproject.toml | 6 +++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index c14e94757..9303ebafa 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -26,7 +26,7 @@ readme = "README.md" license = "Apache-2.0" edition = "2021" rust-version = "1.72" -include = ["/src", "/pyballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] +include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] publish = false [dependencies] @@ -43,6 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync [lib] crate-type = ["cdylib"] -name = "pyballista" - - +name = "ballista" \ No newline at end of file diff --git a/python/README.md b/python/README.md index 2898cb165..b5f6b787d 100644 --- a/python/README.md +++ b/python/README.md @@ -29,7 +29,7 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta Creates a new context and connects to a Ballista scheduler process. ```python -from pyballista import SessionContext +from ballista import RemoteBallista, StandaloneBallista >>> ctx = SessionContext("localhost", 50050) ``` diff --git a/python/pyproject.toml b/python/pyproject.toml index dbb76e59d..2d06b225d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -20,7 +20,7 @@ requires = ["maturin>=0.15,<0.16"] build-backend = "maturin" [project] -name = "pyballista" +name = "ballista" description = "Python client for Apache Arrow Ballista Distributed SQL Query Engine" readme = "README.md" license = {file = "LICENSE.txt"} @@ -55,10 +55,10 @@ repository = "https://github.com/apache/arrow-ballista" profile = "black" [tool.maturin] -module-name = "pyballista.pyballista_internal" +module-name = "ballista.ballista_internal" include = [ { path = "Cargo.lock", format = "sdist" } ] exclude = [".github/**", "ci/**", ".asf.yaml"] # Require Cargo.lock is up to date -locked = true +locked = true \ No newline at end of file From 2da53d4a8169129d7935b1bda1f8a8f59dbb234e Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sat, 2 Nov 2024 18:17:09 -0700 Subject: [PATCH 14/33] changing the packagaing naming convention from pyballista to ballista --- python/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 9303ebafa..b03f1e997 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -43,4 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync [lib] crate-type = ["cdylib"] -name = "ballista" \ No newline at end of file +name = "ballista" From dbd67e5d3b2b2a47999272d18b9798031cfa4e6b Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Wed, 6 Nov 2024 15:31:29 -0800 Subject: [PATCH 15/33] updated python to have two static methods for creating a ballista context --- docs/source/user-guide/python.md | 10 +++++----- python/ballista/__init__.py | 6 ++---- python/ballista/context.py | 18 +++++++---------- python/examples/example.py | 8 ++++---- python/src/lib.rs | 34 ++++++++++++++++---------------- 5 files changed, 35 insertions(+), 41 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 154ec54f0..5f7a1ecff 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -31,12 +31,12 @@ The following code demonstrates how to create a Ballista context and connect to If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `BallistaContext.remote("http://my-remote-ip:50050")`. ```text ->>> from pyballista import StandaloneBallista, RemoteBallista +>>> from ballista import Ballista >>> # for a standalone instance ->>> ctx = StandaloneBallista.build() +>>> ctx = Ballista.standalone() >>> >>> # for a remote instance provide the URL ->>> ctx = RemoteBallista.build("http://url-path-to-scheduler:50050") +>>> ctx = Ballista.remote("http://url-path-to-scheduler:50050") ``` ## SQL @@ -109,12 +109,12 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -from pyballista import StandaloneBallista +from ballista import Ballista import pyarrow # an alias # TODO implement Functions -f = pyballista.functions +f = ballista.functions # create a context ctx = StandaloneBallista.build() diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index 4d469c3fb..82655d9b1 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -26,13 +26,11 @@ import pyarrow as pa from .ballista_internal import ( - StandaloneBallista, - RemoteBallista + Ballista ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "StandaloneBallista", - "RemoteBallista" + "Ballista" ] \ No newline at end of file diff --git a/python/ballista/context.py b/python/ballista/context.py index f383cd07d..4854e8ad3 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -19,17 +19,13 @@ import ballista from typing import List, Any - -class StandaloneBallista: - def __init__(self): - self.ctx = ballista.StandaloneBallista - - def build(self): - return self.ctx.build() -class RemoteBallista: +class Ballista: def __init__(self): - self.ctx = ballista.RemoteBallista + self.ctx = ballista.Ballista - def build(self, url: str): - return self.ctx.build(url) \ No newline at end of file + def standalone(self): + return self.ctx.standalone() + + def remote(self, url: str): + return self.ctx.remote(url) \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 3e554324b..070bf3d7c 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,11 +15,11 @@ # specific language governing permissions and limitations # under the License. -from ballista import StandaloneBallista, RemoteBallista +from ballista import StandaloneBallista, RemoteBallista, Ballista from datafusion.context import SessionContext # Create the Ballista Context [standalone or remote] -ctx: SessionContext = StandaloneBallista.build() +ctx: SessionContext = Ballista.standalone() # Ballista.remote() # Register our parquet file to perform SQL operations ctx.register_parquet("test_parquet", "./testdata/test.parquet") @@ -30,8 +30,8 @@ # Show our test parquet data print(test_parquet.show()) -# To perform daatframe operations, read in data +# To perform dataframe operations, read in data test_csv = ctx.read_csv("./testdata/test.csv", has_header=False) # Show the dataframe -test_csv.show() \ No newline at end of file +test_csv.show() diff --git a/python/src/lib.rs b/python/src/lib.rs index 4aaea15d5..9564e6835 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -26,36 +26,36 @@ mod utils; fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs - m.add_class::()?; - m.add_class::()?; + m.add_class::()?; // DataFusion structs m.add_class::()?; Ok(()) } -#[pyclass(name = "StandaloneBallista", module = "ballista", subclass)] -pub struct PyStandaloneBallista; +#[pyclass(name = "Ballista", module = "ballista", subclass)] +pub struct PyBallista; #[pymethods] -impl PyStandaloneBallista { +impl PyBallista { #[staticmethod] - pub fn build(py: Python) -> PyResult { + /// Construct the standalone instance from the SessionContext + pub fn standalone(py: Python) -> PyResult { + // Define the SessionContext let session_context = SessionContext::standalone(); - let ctx = wait_for_future(py, session_context)?; + // SessionContext is an async function + let ctx = wait_for_future(py, session_context).unwrap(); + + // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } -} - -#[pyclass(name = "RemoteBallista", module = "ballista", subclass)] -pub struct PyRemoteBallista; - -#[pymethods] -impl PyRemoteBallista { + #[staticmethod] - pub fn build(url: &str, py: Python) -> PyResult { + /// Construct the remote instance from the SessionContext + pub fn remote(url: &str, py: Python) -> PyResult { let session_context = SessionContext::remote(url); let ctx = wait_for_future(py, session_context)?; - + + // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } -} +} \ No newline at end of file From 31a16107e595a290e61e5f195eee498ac5c9edce Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Thu, 7 Nov 2024 12:57:13 -0800 Subject: [PATCH 16/33] updated python to have two static methods for creating a ballista context --- python/README.md | 4 ++-- python/ballista/context.py | 4 ++++ python/examples/example.py | 8 ++++++-- python/src/lib.rs | 17 +++++++++++++++-- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/python/README.md b/python/README.md index b5f6b787d..c9f92eaa0 100644 --- a/python/README.md +++ b/python/README.md @@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta Creates a new context and connects to a Ballista scheduler process. ```python -from ballista import RemoteBallista, StandaloneBallista ->>> ctx = SessionContext("localhost", 50050) +from ballista import Ballista +>>> ctx = Ballista.standalone() ``` ## Example SQL Usage diff --git a/python/ballista/context.py b/python/ballista/context.py index 4854e8ad3..08948d1d6 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -23,6 +23,10 @@ class Ballista: def __init__(self): self.ctx = ballista.Ballista + self.conf = {} + + def config(self, conf: dict = {}): + self.conf = conf def standalone(self): return self.ctx.standalone() diff --git a/python/examples/example.py b/python/examples/example.py index 070bf3d7c..cd642a6cc 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,11 +15,15 @@ # specific language governing permissions and limitations # under the License. -from ballista import StandaloneBallista, RemoteBallista, Ballista +from ballista import Ballista from datafusion.context import SessionContext +# Define the Ballista configuration +conf = {"example": "example"} +ballista = Ballista.config(conf) + # Create the Ballista Context [standalone or remote] -ctx: SessionContext = Ballista.standalone() # Ballista.remote() +ctx: SessionContext = ballista.standalone() # Ballista.remote() # Register our parquet file to perform SQL operations ctx.register_parquet("test_parquet", "./testdata/test.parquet") diff --git a/python/src/lib.rs b/python/src/lib.rs index 9564e6835..5a64c6529 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -19,6 +19,9 @@ use ballista::prelude::*; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; +use pyo3::types::{IntoPyDict, PyDict}; +use std::collections::HashMap; + use pyo3::prelude::*; mod utils; @@ -33,10 +36,20 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { } #[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista; +pub struct PyBallista(pub Option>); #[pymethods] -impl PyBallista { +impl PyBallista { + #[staticmethod] + #[pyo3(signature = (config=None))] + pub fn config(config: Option>) -> Self { + if let Some(conf) = config { + Self(Some(conf)) + } else { + Self(None) + } + } + #[staticmethod] /// Construct the standalone instance from the SessionContext pub fn standalone(py: Python) -> PyResult { From 93a9eea3b82210a96df88d803989c96ca56d9446 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Thu, 7 Nov 2024 12:58:49 -0800 Subject: [PATCH 17/33] updated python to have two static methods for creating a ballista context --- python/src/lib.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 5a64c6529..a90a3de92 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -19,7 +19,6 @@ use ballista::prelude::*; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; -use pyo3::types::{IntoPyDict, PyDict}; use std::collections::HashMap; use pyo3::prelude::*; @@ -49,7 +48,7 @@ impl PyBallista { Self(None) } } - + #[staticmethod] /// Construct the standalone instance from the SessionContext pub fn standalone(py: Python) -> PyResult { @@ -57,18 +56,18 @@ impl PyBallista { let session_context = SessionContext::standalone(); // SessionContext is an async function let ctx = wait_for_future(py, session_context).unwrap(); - + // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } - + #[staticmethod] /// Construct the remote instance from the SessionContext pub fn remote(url: &str, py: Python) -> PyResult { let session_context = SessionContext::remote(url); let ctx = wait_for_future(py, session_context)?; - + // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } -} \ No newline at end of file +} From c71fa31baa17260e9791f702eeaf008dc60fd587 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 8 Nov 2024 15:59:47 -0800 Subject: [PATCH 18/33] updated python to have two static methods for creating a ballista context --- docs/source/user-guide/python.md | 4 ++-- python/ballista/context.py | 9 ++++----- python/examples/example.py | 9 ++++++--- python/src/lib.rs | 21 ++++++++++++++------- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 5f7a1ecff..10eaa3e08 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -28,7 +28,7 @@ popular file formats files, run it in a distributed environment, and obtain the The following code demonstrates how to create a Ballista context and connect to a scheduler. -If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `BallistaContext.remote("http://my-remote-ip:50050")`. +If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `Ballista.remote("http://my-remote-ip:50050")`. ```text >>> from ballista import Ballista @@ -117,7 +117,7 @@ import pyarrow f = ballista.functions # create a context -ctx = StandaloneBallista.build() +ctx = Ballista.standalone() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( diff --git a/python/ballista/context.py b/python/ballista/context.py index 08948d1d6..b4a5f1cac 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -21,12 +21,11 @@ from typing import List, Any class Ballista: - def __init__(self): - self.ctx = ballista.Ballista - self.conf = {} + def __new__(self, conf = None): + self.ctx = ballista.Ballista(conf) - def config(self, conf: dict = {}): - self.conf = conf + def config(self, conf = None): + self.ctx = ballista.Ballista(conf) def standalone(self): return self.ctx.standalone() diff --git a/python/examples/example.py b/python/examples/example.py index cd642a6cc..821dbf90b 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -18,9 +18,12 @@ from ballista import Ballista from datafusion.context import SessionContext -# Define the Ballista configuration -conf = {"example": "example"} -ballista = Ballista.config(conf) +# Ballista will initiate with an empty config +ballista = Ballista() + +# But you can also set your own config +conf = [("example", "example")] +ballista.config(conf) # Create the Ballista Context [standalone or remote] ctx: SessionContext = ballista.standalone() # Ballista.remote() diff --git a/python/src/lib.rs b/python/src/lib.rs index a90a3de92..cc40dca9a 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -35,17 +35,24 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { } #[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista(pub Option>); +pub struct PyBallista(pub Option>); #[pymethods] impl PyBallista { - #[staticmethod] + #[new] + #[pyo3(signature = (config=None))] + pub fn new(config: Option>) -> Self { + match config { + Some(config) => Self(Some(config)), + None => Self(Some(vec![(String::from(""), String::from(""))])) + } + } + #[pyo3(signature = (config=None))] - pub fn config(config: Option>) -> Self { - if let Some(conf) = config { - Self(Some(conf)) - } else { - Self(None) + pub fn config(&mut self, config: Option>) -> Self { + match config { + Some(config) => Self(Some(config)), + None => Self(Some(vec![(String::from(""), String::from(""))])) } } From 62e692b88f9b4b97308a2071a1db7a297258af5b Mon Sep 17 00:00:00 2001 From: tbar4 Date: Mon, 11 Nov 2024 15:05:49 -0800 Subject: [PATCH 19/33] Updating BallistaContext and Config --- python/ballista/__init__.py | 8 +- python/ballista/context.py | 115 ++++++++++++++++++++-- python/examples/example.py | 16 +++- python/src/lib.rs | 186 +++++++++++++++++++++++++++++++++--- 4 files changed, 298 insertions(+), 27 deletions(-) diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index 82655d9b1..fb4b1be7e 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -26,11 +26,15 @@ import pyarrow as pa from .ballista_internal import ( - Ballista + Ballista, + BallistaConfig, + BallistaConfigBuilder ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "Ballista" + "Ballista", + "BallistaConfig", + "BallistaConfigBuilder" ] \ No newline at end of file diff --git a/python/ballista/context.py b/python/ballista/context.py index b4a5f1cac..5e89b1742 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -15,20 +15,117 @@ # specific language governing permissions and limitations # under the License. +from _typeshed import Self from datafusion import SessionContext -import ballista +from ballista import Ballista, BallistaConfig, BallistaConfigBuilder from typing import List, Any + +class BallistaConfigBuilder: + def __init__(self) -> None: + pass + + def set(self, k: str, v: str): + BallistaConfigBuilder.set(self, k, v) + + def build() -> BallistaConfig: + BallistaConfig() + +class BallistaConfig: + def __init__(self): + self.config = self.with_settings({}) + + def builder(self) -> BallistaConfig: + builder = self.builder() + + return builder + + def with_settings(self, settings: dict) -> BallistaConfig: + self.with_settings(settings) + + return self + + def settings(self) -> None: + self.settings() + + def default_shuffle_partitions(self): + self.default_shuffle_partitions() + + def default_batch_size(self): + self.default_batch_size() + + def hash_join_single_partition_threshold(self): + self.hash_join_single_partition_threshold() + + def default_grpc_client_max_message_size(self): + self.default_grpc_client_max_message_size() + + def repartition_joins(self): + self.repartition_joins() + + def repartition_aggregations(self): + self.repartition_aggregations() + + def repartition_windows(self): + self.repartition_windows() + + def parquet_pruning(self): + self.parquet_pruning() + + def collect_statistics(self): + self.collect_statistics() + + def default_standalone_parallelism(self): + self.default_standalone_parallelism() + + def default_with_information_schema(self): + self.default_with_information_schema() class Ballista: - def __new__(self, conf = None): - self.ctx = ballista.Ballista(conf) + def __init__(self): + self.config = BallistaConfig() + + def configuration(self, settings: dict): + self.config = BallistaConfig().builder().with_settings(settings) + + def standalone(self) -> SessionContext: + return self.standalone() + + def remote(self, url: str) -> SessionContext: + return self.remote(url) + + def settings(self): + self.config.settings() + + def default_shuffle_partitions(self): + self.config.default_shuffle_partitions() + + def default_batch_size(self): + self.config.default_batch_size() + + def hash_join_single_partition_threshold(self): + self.config.hash_join_single_partition_threshold() + + def default_grpc_client_max_message_size(self): + self.config.default_grpc_client_max_message_size() + + def repartition_joins(self): + self.config.repartition_joins() + + def repartion_aggregations(self): + self.config.repartition_aggregations() + + def repartition_windows(self): + self.config.repartition_windows() + + def parquet_pruning(self): + self.config.parquet_pruning() - def config(self, conf = None): - self.ctx = ballista.Ballista(conf) + def collect_statistics(self): + self.config.collect_statistics() - def standalone(self): - return self.ctx.standalone() + def default_standalone_parallelism(self): + self.config.default_standalone_parallelism() - def remote(self, url: str): - return self.ctx.remote(url) \ No newline at end of file + def default_with_information_schema(self): + self.config.default_with_information_schema() \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 821dbf90b..39f07360a 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,16 +15,26 @@ # specific language governing permissions and limitations # under the License. -from ballista import Ballista +from ballista import Ballista, BallistaConfig, BallistaConfigBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config ballista = Ballista() +config = BallistaConfig() + +# Define custom settings +job_settings = { + "BALLISTA_JOB_NAME": "Example Ballista Job", + "DEFAULT_SHUFFLE_PARTITIONS": "2" +} + +ballista.configuration(job_settings) # But you can also set your own config -conf = [("example", "example")] -ballista.config(conf) +print("New Ballista Config: ", ballista.settings()) +# Or you can check default settings in BallistaConfig +print("Default Shuffle Partitions: ", ballista.default_shuffle_partitions()) # Create the Ballista Context [standalone or remote] ctx: SessionContext = ballista.standalone() # Ballista.remote() diff --git a/python/src/lib.rs b/python/src/lib.rs index cc40dca9a..ba0a1e270 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use ballista::prelude::*; +use ballista_core::config::BallistaConfigBuilder; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; -use std::collections::HashMap; use pyo3::prelude::*; mod utils; @@ -31,29 +33,138 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; // DataFusion structs m.add_class::()?; + // Ballista Config + m.add_class::()?; + m.add_class::()?; Ok(()) } +/// Ballista configuration builder +#[pyclass(name = "BallistaConfigBuilder", module = "ballista", subclass)] +pub struct PyBallistaConfigBuilder { + settings: HashMap, +} + +#[pymethods] +impl PyBallistaConfigBuilder { + #[new] + pub fn new() -> Self { + Self { + settings: HashMap::new() + } + } + /// Create a new config with an additional setting + pub fn set(&self, k: &str, v: &str) -> Self { + let mut settings = self.settings.clone(); + settings.insert(k.to_owned(), v.to_owned()); + Self { settings } + } + + #[staticmethod] + pub fn build() -> PyBallistaConfig { + PyBallistaConfig::new() + } +} + +#[pyclass(name = "BallistaConfig", module = "ballista", subclass)] +pub struct PyBallistaConfig { + // Settings stored in map for easy serde + pub config: BallistaConfig, +} + +#[pymethods] +impl PyBallistaConfig { + #[new] + pub fn new() -> Self { + Self { + config: BallistaConfig::with_settings(HashMap::new()).unwrap() + } + } + + #[staticmethod] + pub fn with_settings(settings: HashMap) -> Self { + let settings = BallistaConfig::with_settings(settings).unwrap(); + + Self { + config: settings + } + } + + #[staticmethod] + pub fn builder() -> PyBallistaConfigBuilder { + PyBallistaConfigBuilder { + settings: HashMap::new() + } + } + + pub fn settings(&self) -> HashMap { + let settings = &self.config.settings().clone(); + settings.to_owned() + } + + pub fn default_shuffle_partitions(&self) -> usize { + self.config.default_shuffle_partitions() + } + + pub fn default_batch_size(&self) -> usize { + self.config.default_batch_size() + } + + pub fn hash_join_single_partition_threshold(&self) -> usize { + self.config.hash_join_single_partition_threshold() + } + + pub fn default_grpc_client_max_message_size(&self) -> usize { + self.config.default_grpc_client_max_message_size() + } + + pub fn repartition_joins(&self) -> bool { + self.config.repartition_joins() + } + + pub fn repartition_aggregations(&self) -> bool { + self.config.repartition_aggregations() + } + + pub fn repartition_windows(&self) -> bool { + self.config.repartition_windows() + } + + pub fn parquet_pruning(&self) -> bool { + self.config.parquet_pruning() + } + + pub fn collect_statistics(&self) -> bool { + self.config.collect_statistics() + } + + pub fn default_standalone_parallelism(&self) -> usize { + self.config.default_standalone_parallelism() + } + + pub fn default_with_information_schema(&self) -> bool { + self.config.default_with_information_schema() + } +} + #[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista(pub Option>); +pub struct PyBallista { + pub config: PyBallistaConfig +} #[pymethods] impl PyBallista { #[new] - #[pyo3(signature = (config=None))] - pub fn new(config: Option>) -> Self { - match config { - Some(config) => Self(Some(config)), - None => Self(Some(vec![(String::from(""), String::from(""))])) + pub fn new() -> Self { + Self { + config: PyBallistaConfig::new() } } - #[pyo3(signature = (config=None))] - pub fn config(&mut self, config: Option>) -> Self { - match config { - Some(config) => Self(Some(config)), - None => Self(Some(vec![(String::from(""), String::from(""))])) - } + pub fn configuration(&mut self, settings: HashMap) { + let settings = BallistaConfig::with_settings(settings).expect("Non-Valid entries"); + let ballista_config = PyBallistaConfig { config: settings }; + self.config = ballista_config } #[staticmethod] @@ -77,4 +188,53 @@ impl PyBallista { // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } + + pub fn settings(&self) -> HashMap { + let settings = &self.config.settings(); + settings.to_owned() + } + + pub fn default_shuffle_partitions(&self) -> usize { + self.config.default_shuffle_partitions() + } + + pub fn default_batch_size(&self) -> usize { + self.config.default_batch_size() + } + + pub fn hash_join_single_partition_threshold(&self) -> usize { + self.config.hash_join_single_partition_threshold() + } + + pub fn default_grpc_client_max_message_size(&self) -> usize { + self.config.default_grpc_client_max_message_size() + } + + pub fn repartition_joins(&self) -> bool { + self.config.repartition_joins() + } + + pub fn repartition_aggregations(&self) -> bool { + self.config.repartition_aggregations() + } + + pub fn repartition_windows(&self) -> bool { + self.config.repartition_windows() + } + + pub fn parquet_pruning(&self) -> bool { + self.config.parquet_pruning() + } + + pub fn collect_statistics(&self) -> bool { + self.config.collect_statistics() + } + + pub fn default_standalone_parallelism(&self) -> usize { + self.config.default_standalone_parallelism() + } + + pub fn default_with_information_schema(&self) -> bool { + self.config.default_with_information_schema() + } } From a92a568c631d0fc854880ac171517d6222426cac Mon Sep 17 00:00:00 2001 From: tbar4 Date: Tue, 12 Nov 2024 10:26:56 -0800 Subject: [PATCH 20/33] Updating BallistaContext and Config --- python/src/lib.rs | 188 +++++++++------------------------------------- 1 file changed, 35 insertions(+), 153 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index ba0a1e270..3bed4c35d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - +use ballista::extension::SessionConfigExt; use ballista::prelude::*; -use ballista_core::config::BallistaConfigBuilder; +use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; +use std::cell::RefCell; use pyo3::prelude::*; mod utils; @@ -34,122 +34,56 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { // DataFusion structs m.add_class::()?; // Ballista Config - m.add_class::()?; - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } -/// Ballista configuration builder -#[pyclass(name = "BallistaConfigBuilder", module = "ballista", subclass)] -pub struct PyBallistaConfigBuilder { - settings: HashMap, +/// Ballista Session Extension builder +#[pyclass(name = "SessionConfig", module = "ballista", subclass)] +#[derive(Clone)] +pub struct PySessionConfig { + pub session_config: SessionConfig, } #[pymethods] -impl PyBallistaConfigBuilder { +impl PySessionConfig { #[new] pub fn new() -> Self { - Self { - settings: HashMap::new() - } - } - /// Create a new config with an additional setting - pub fn set(&self, k: &str, v: &str) -> Self { - let mut settings = self.settings.clone(); - settings.insert(k.to_owned(), v.to_owned()); - Self { settings } + let session_config = SessionConfig::new_with_ballista(); + + Self { session_config } } - - #[staticmethod] - pub fn build() -> PyBallistaConfig { - PyBallistaConfig::new() + + pub fn set_str(&mut self, key: &str, value: &str) -> Self { + self.session_config.options_mut().set(key, value); + + self.clone() } } -#[pyclass(name = "BallistaConfig", module = "ballista", subclass)] -pub struct PyBallistaConfig { - // Settings stored in map for easy serde - pub config: BallistaConfig, +#[pyclass(name = "SessionStateBuilder", module = "ballista", subclass)] +pub struct PySessionStateBuilder { + pub state: RefCell, } #[pymethods] -impl PyBallistaConfig { +impl PySessionStateBuilder { #[new] - pub fn new() -> Self { - Self { - config: BallistaConfig::with_settings(HashMap::new()).unwrap() - } - } - - #[staticmethod] - pub fn with_settings(settings: HashMap) -> Self { - let settings = BallistaConfig::with_settings(settings).unwrap(); - + pub fn new() -> Self { Self { - config: settings - } - } - - #[staticmethod] - pub fn builder() -> PyBallistaConfigBuilder { - PyBallistaConfigBuilder { - settings: HashMap::new() + state: RefCell::new(SessionStateBuilder::new()), } } - - pub fn settings(&self) -> HashMap { - let settings = &self.config.settings().clone(); - settings.to_owned() - } - - pub fn default_shuffle_partitions(&self) -> usize { - self.config.default_shuffle_partitions() - } - - pub fn default_batch_size(&self) -> usize { - self.config.default_batch_size() - } - - pub fn hash_join_single_partition_threshold(&self) -> usize { - self.config.hash_join_single_partition_threshold() - } - - pub fn default_grpc_client_max_message_size(&self) -> usize { - self.config.default_grpc_client_max_message_size() - } - - pub fn repartition_joins(&self) -> bool { - self.config.repartition_joins() - } - - pub fn repartition_aggregations(&self) -> bool { - self.config.repartition_aggregations() - } - - pub fn repartition_windows(&self) -> bool { - self.config.repartition_windows() - } - pub fn parquet_pruning(&self) -> bool { - self.config.parquet_pruning() - } - - pub fn collect_statistics(&self) -> bool { - self.config.collect_statistics() - } - - pub fn default_standalone_parallelism(&self) -> usize { - self.config.default_standalone_parallelism() - } - - pub fn default_with_information_schema(&self) -> bool { - self.config.default_with_information_schema() + pub fn with_config(slf: PyRefMut<'_, Self>, config: PySessionConfig) { + slf.state.take().with_config(config.session_config); } } #[pyclass(name = "Ballista", module = "ballista", subclass)] pub struct PyBallista { - pub config: PyBallistaConfig + pub state: RefCell, } #[pymethods] @@ -157,21 +91,18 @@ impl PyBallista { #[new] pub fn new() -> Self { Self { - config: PyBallistaConfig::new() + state: RefCell::new(SessionStateBuilder::new()), } } - - pub fn configuration(&mut self, settings: HashMap) { - let settings = BallistaConfig::with_settings(settings).expect("Non-Valid entries"); - let ballista_config = PyBallistaConfig { config: settings }; - self.config = ballista_config - } - #[staticmethod] /// Construct the standalone instance from the SessionContext - pub fn standalone(py: Python) -> PyResult { + pub fn standalone( + slf: PyRef<'_, Self>, + py: Python, + ) -> PyResult { + let take_state = slf.state.take().build(); // Define the SessionContext - let session_context = SessionContext::standalone(); + let session_context = SessionContext::standalone_with_state(take_state); // SessionContext is an async function let ctx = wait_for_future(py, session_context).unwrap(); @@ -188,53 +119,4 @@ impl PyBallista { // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } - - pub fn settings(&self) -> HashMap { - let settings = &self.config.settings(); - settings.to_owned() - } - - pub fn default_shuffle_partitions(&self) -> usize { - self.config.default_shuffle_partitions() - } - - pub fn default_batch_size(&self) -> usize { - self.config.default_batch_size() - } - - pub fn hash_join_single_partition_threshold(&self) -> usize { - self.config.hash_join_single_partition_threshold() - } - - pub fn default_grpc_client_max_message_size(&self) -> usize { - self.config.default_grpc_client_max_message_size() - } - - pub fn repartition_joins(&self) -> bool { - self.config.repartition_joins() - } - - pub fn repartition_aggregations(&self) -> bool { - self.config.repartition_aggregations() - } - - pub fn repartition_windows(&self) -> bool { - self.config.repartition_windows() - } - - pub fn parquet_pruning(&self) -> bool { - self.config.parquet_pruning() - } - - pub fn collect_statistics(&self) -> bool { - self.config.collect_statistics() - } - - pub fn default_standalone_parallelism(&self) -> usize { - self.config.default_standalone_parallelism() - } - - pub fn default_with_information_schema(&self) -> bool { - self.config.default_with_information_schema() - } } From ca9d60d87a6d24d17a6d34ecdc43118f7e0ff1bb Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Tue, 12 Nov 2024 16:43:46 -0800 Subject: [PATCH 21/33] updated python to have two static methods for creating a ballista context --- python/ballista/__init__.py | 10 +-- python/ballista/context.py | 122 ++++++++---------------------------- python/examples/example.py | 17 ++++- python/src/lib.rs | 27 ++++++-- 4 files changed, 70 insertions(+), 106 deletions(-) diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index fb4b1be7e..d9f4c816a 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -27,14 +27,16 @@ from .ballista_internal import ( Ballista, - BallistaConfig, - BallistaConfigBuilder + SessionConfig, + SessionStateBuilder, + SessionState ) __version__ = importlib_metadata.version(__name__) __all__ = [ "Ballista", - "BallistaConfig", - "BallistaConfigBuilder" + "SessionConfig", + "SessionStateBuilder", + "SessionState" ] \ No newline at end of file diff --git a/python/ballista/context.py b/python/ballista/context.py index 5e89b1742..e6da21cce 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -17,115 +17,45 @@ from _typeshed import Self from datafusion import SessionContext -from ballista import Ballista, BallistaConfig, BallistaConfigBuilder +from ballista import SessionConfig, SessionStateBuilder, SessionState, Ballista from typing import List, Any -class BallistaConfigBuilder: - def __init__(self) -> None: - pass - - def set(self, k: str, v: str): - BallistaConfigBuilder.set(self, k, v) +class SessionConfig: + def __new__(cls): + return super().__new__(cls) - def build() -> BallistaConfig: - BallistaConfig() - -class BallistaConfig: def __init__(self): - self.config = self.with_settings({}) - - def builder(self) -> BallistaConfig: - builder = self.builder() - - return builder - - def with_settings(self, settings: dict) -> BallistaConfig: - self.with_settings(settings) - - return self + self.session_config = SessionConfig() - def settings(self) -> None: - self.settings() + def set_str(self, key: str, value: str): + self.session_config.set_str(key, value) - def default_shuffle_partitions(self): - self.default_shuffle_partitions() - - def default_batch_size(self): - self.default_batch_size() +class SessionStateBuilder: + def __new__(cls): + return super().__new__(cls) - def hash_join_single_partition_threshold(self): - self.hash_join_single_partition_threshold() - - def default_grpc_client_max_message_size(self): - self.default_grpc_client_max_message_size() - - def repartition_joins(self): - self.repartition_joins() - - def repartition_aggregations(self): - self.repartition_aggregations() - - def repartition_windows(self): - self.repartition_windows() + def __init__(self) -> None: + self.state = SessionStateBuilder() - def parquet_pruning(self): - self.parquet_pruning() + def with_config(self, config: SessionConfig) -> SessionStateBuilder: + self.with_config(config) - def collect_statistics(self): - self.collect_statistics() + return self - def default_standalone_parallelism(self): - self.default_standalone_parallelism() + def build(self) -> SessionState: + self.build() - def default_with_information_schema(self): - self.default_with_information_schema() +class SessionState: + def __new__(cls): + return super().__new__(cls) class Ballista: - def __init__(self): - self.config = BallistaConfig() - - def configuration(self, settings: dict): - self.config = BallistaConfig().builder().with_settings(settings) - - def standalone(self) -> SessionContext: - return self.standalone() - - def remote(self, url: str) -> SessionContext: - return self.remote(url) - - def settings(self): - self.config.settings() - - def default_shuffle_partitions(self): - self.config.default_shuffle_partitions() - - def default_batch_size(self): - self.config.default_batch_size() - - def hash_join_single_partition_threshold(self): - self.config.hash_join_single_partition_threshold() + def __new__(cls): + return super().__new__(cls) - def default_grpc_client_max_message_size(self): - self.config.default_grpc_client_max_message_size() - - def repartition_joins(self): - self.config.repartition_joins() - - def repartion_aggregations(self): - self.config.repartition_aggregations() - - def repartition_windows(self): - self.config.repartition_windows() - - def parquet_pruning(self): - self.config.parquet_pruning() - - def collect_statistics(self): - self.config.collect_statistics() - - def default_standalone_parallelism(self): - self.config.default_standalone_parallelism() + def __init__(self) -> None: + self.state = Ballista() - def default_with_information_schema(self): - self.config.default_with_information_schema() \ No newline at end of file + def standalone(self): + self.standalone() \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 39f07360a..00caaf777 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,13 +15,25 @@ # specific language governing permissions and limitations # under the License. -from ballista import Ballista, BallistaConfig, BallistaConfigBuilder +from ballista import Ballista, SessionConfig, SessionStateBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config ballista = Ballista() -config = BallistaConfig() +config = SessionConfig()\ + .set_str("BALLISTA_DEFAULT_SHUFFLE_PARTITIONS", "4") + +# Build the state +state = SessionStateBuilder()\ + .with_config(config)\ + .build() +# Create the context +ctx: SessionContext = Ballista().standalone() + +ctx.sql("SELECT 1").show() + +""" # Define custom settings job_settings = { "BALLISTA_JOB_NAME": "Example Ballista Job", @@ -52,3 +64,4 @@ # Show the dataframe test_csv.show() +""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 3bed4c35d..cb206e3a1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,10 +17,13 @@ use ballista::extension::SessionConfigExt; use ballista::prelude::*; -use datafusion::execution::SessionStateBuilder; +use ballista_core::utils::SessionStateExt; +use datafusion::catalog::Session; +use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; +use std::borrow::BorrowMut; use std::cell::RefCell; use pyo3::prelude::*; @@ -35,6 +38,7 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; // Ballista Config m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) } @@ -76,8 +80,18 @@ impl PySessionStateBuilder { } } - pub fn with_config(slf: PyRefMut<'_, Self>, config: PySessionConfig) { - slf.state.take().with_config(config.session_config); + pub fn with_config(&mut self, config: PySessionConfig) -> PySessionStateBuilder { + let state = self.state.take().with_config(config.session_config); + + PySessionStateBuilder { + state: state.into() + } + } + + pub fn build(&mut self) -> PySessionStateBuilder { + PySessionStateBuilder { + state: RefCell::new(self.state.take()) + } } } @@ -94,13 +108,18 @@ impl PyBallista { state: RefCell::new(SessionStateBuilder::new()), } } + + pub fn update_state(&mut self, state: &PyCell) { + self.state = state.borrow_mut().state + } /// Construct the standalone instance from the SessionContext pub fn standalone( slf: PyRef<'_, Self>, + state: PySessionStateBuilder, py: Python, ) -> PyResult { - let take_state = slf.state.take().build(); + let take_state = state.take().build(); // Define the SessionContext let session_context = SessionContext::standalone_with_state(take_state); // SessionContext is an async function From 407540afc6f4bae289a86633a3e723f20b740ab2 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Thu, 14 Nov 2024 21:59:26 -0800 Subject: [PATCH 22/33] Updating BallistaContext and Config, calling it for the night, will complete tomorrow --- python/ballista/__init__.py | 14 ++-- python/ballista/context.py | 48 +++++++++---- python/examples/example.py | 19 ++--- python/src/lib.rs | 139 ++++++++++++++++++++++++------------ 4 files changed, 143 insertions(+), 77 deletions(-) diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index d9f4c816a..99486ae55 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -27,16 +27,18 @@ from .ballista_internal import ( Ballista, - SessionConfig, - SessionStateBuilder, - SessionState + BallistaBuilder, + #SessionConfig, + #SessionStateBuilder, + #SessionState ) __version__ = importlib_metadata.version(__name__) __all__ = [ "Ballista", - "SessionConfig", - "SessionStateBuilder", - "SessionState" + "BallistaBuilder", + #"SessionConfig", + #"SessionStateBuilder", + #"SessionState" ] \ No newline at end of file diff --git a/python/ballista/context.py b/python/ballista/context.py index e6da21cce..a09451438 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -17,10 +17,41 @@ from _typeshed import Self from datafusion import SessionContext -from ballista import SessionConfig, SessionStateBuilder, SessionState, Ballista +from ballista import Ballista, BallistaBuilder from typing import List, Any + +class BallistaBuilder: + def __new__(cls): + return super().__new__(cls) + + def __init__(self) -> None: + self = {} + + def set(self, k, v) -> Self: + return self.set(k, v) + + +class Ballista: + def __new__(cls): + return super().__new__(cls) + + def __init__(self) -> None: + self.state = Ballista() + + def standalone(self): + self.standalone() + + def builder(self) -> BallistaBuilder: + return BallistaBuilder() + +""" +### Future State Implementation +class SessionState: + def __new__(cls): + return super().__new__(cls) + class SessionConfig: def __new__(cls): return super().__new__(cls) @@ -45,17 +76,4 @@ def with_config(self, config: SessionConfig) -> SessionStateBuilder: def build(self) -> SessionState: self.build() - -class SessionState: - def __new__(cls): - return super().__new__(cls) - -class Ballista: - def __new__(cls): - return super().__new__(cls) - - def __init__(self) -> None: - self.state = Ballista() - - def standalone(self): - self.standalone() \ No newline at end of file +""" \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 00caaf777..dcaa3a087 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,25 +15,26 @@ # specific language governing permissions and limitations # under the License. -from ballista import Ballista, SessionConfig, SessionStateBuilder +from ballista import Ballista, BallistaBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config -ballista = Ballista() -config = SessionConfig()\ - .set_str("BALLISTA_DEFAULT_SHUFFLE_PARTITIONS", "4") - -# Build the state -state = SessionStateBuilder()\ - .with_config(config)\ +ballista = BallistaBuilder()\ + .set("ballista.job.name", "example ballista")\ + .set("ballista.shuffle.partitions", "4")\ + .set("ballista.executor.cpus", "4")\ .build() + +print(ballista) +print(ballista.show_config()) +""" # Create the context ctx: SessionContext = Ballista().standalone() ctx.sql("SELECT 1").show() -""" + # Define custom settings job_settings = { "BALLISTA_JOB_NAME": "Example Ballista Job", diff --git a/python/src/lib.rs b/python/src/lib.rs index cb206e3a1..48ab40e56 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -23,8 +23,10 @@ use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; -use std::borrow::BorrowMut; -use std::cell::RefCell; + +use std::collections::HashMap; +use std::fmt::Formatter; +use std::path::Display; use pyo3::prelude::*; mod utils; @@ -34,15 +36,102 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs m.add_class::()?; + m.add_class::()?; // DataFusion structs m.add_class::()?; // Ballista Config + /* + // Future implementation will include more state and config options m.add_class::()?; m.add_class::()?; m.add_class::()?; + */ Ok(()) } +// Ballista Builder will take a HasMap/Dict Cionfg +#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] +pub struct PyBallistaBuilder(HashMap); + +#[pymethods] +impl PyBallistaBuilder { + #[new] + pub fn new() -> Self { + Self(HashMap::new()) + } + + pub fn set(mut slf: PyRefMut<'_, Self>, k: &str, v: &str, py: Python) -> PyResult { + slf.0.insert(k.into(), v.into()); + + Ok(slf.into_py(py)) + } + + pub fn show_config(&self) { + println!("Ballista Config:"); + for ele in self.0.iter() { + println!(" {}: {}", ele.0, ele.1) + } + } + + pub fn build(slf: PyRef<'_, Self>) -> PyBallista { + PyBallista { + conf: PyBallistaBuilder(slf.0.clone()) + } + } +} + +#[pyclass(name = "Ballista", module = "ballista", subclass)] +pub struct PyBallista { + pub conf: PyBallistaBuilder, +} + +#[pymethods] +impl PyBallista { + #[new] + pub fn new() -> Self { + Self { + conf: PyBallistaBuilder::new(), + } + } + + pub fn show_config(&self) { + println!("Ballista Config:"); + for ele in self.conf.0.clone() { + println!("{:4}: {}", ele.0, ele.1) + } + } + + /// Construct the standalone instance from the SessionContext + pub fn standalone( + &self, + concurrent_tasks: usize, + py: Python, + ) -> PyResult { + // Build the config + let config = BallistaConfig::with_settings(self.conf.0).unwrap(); + // Define the SessionContext + let session_context = BallistaContext::standalone(&config, concurrent_tasks); + // SessionContext is an async function + let ctx = wait_for_future(py, session_context).unwrap(); + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.context().into()) + } + + /// Construct the remote instance from the SessionContext + pub fn remote(url: &str, py: Python) -> PyResult { + let session_context = SessionContext::remote(url); + let ctx = wait_for_future(py, session_context)?; + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.into()) + } +} + + +/* +Plan to implement Session Config and State in a future issue + /// Ballista Session Extension builder #[pyclass(name = "SessionConfig", module = "ballista", subclass)] #[derive(Clone)] @@ -94,48 +183,4 @@ impl PySessionStateBuilder { } } } - -#[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista { - pub state: RefCell, -} - -#[pymethods] -impl PyBallista { - #[new] - pub fn new() -> Self { - Self { - state: RefCell::new(SessionStateBuilder::new()), - } - } - - pub fn update_state(&mut self, state: &PyCell) { - self.state = state.borrow_mut().state - } - - /// Construct the standalone instance from the SessionContext - pub fn standalone( - slf: PyRef<'_, Self>, - state: PySessionStateBuilder, - py: Python, - ) -> PyResult { - let take_state = state.take().build(); - // Define the SessionContext - let session_context = SessionContext::standalone_with_state(take_state); - // SessionContext is an async function - let ctx = wait_for_future(py, session_context).unwrap(); - - // Convert the SessionContext into a Python SessionContext - Ok(ctx.into()) - } - - #[staticmethod] - /// Construct the remote instance from the SessionContext - pub fn remote(url: &str, py: Python) -> PyResult { - let session_context = SessionContext::remote(url); - let ctx = wait_for_future(py, session_context)?; - - // Convert the SessionContext into a Python SessionContext - Ok(ctx.into()) - } -} +*/ From 0a3cbd0b30acbd89945b8a9bdc60829a7b0f4d97 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Thu, 14 Nov 2024 22:07:14 -0800 Subject: [PATCH 23/33] Updating BallistaContext and Config, calling it for the night, will complete tomorrow --- python/ballista/context.py | 2 +- python/src/lib.rs | 37 +++++++++++++++++++++++-------------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/python/ballista/context.py b/python/ballista/context.py index a09451438..314fefd50 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -40,7 +40,7 @@ def __new__(cls): def __init__(self) -> None: self.state = Ballista() - def standalone(self): + def standalone(self, concurrent_tasks: int = 4): self.standalone() def builder(self) -> BallistaBuilder: diff --git a/python/src/lib.rs b/python/src/lib.rs index 48ab40e56..c00ae8961 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -59,25 +59,30 @@ impl PyBallistaBuilder { pub fn new() -> Self { Self(HashMap::new()) } - - pub fn set(mut slf: PyRefMut<'_, Self>, k: &str, v: &str, py: Python) -> PyResult { + + pub fn set( + mut slf: PyRefMut<'_, Self>, + k: &str, + v: &str, + py: Python, + ) -> PyResult { slf.0.insert(k.into(), v.into()); - + Ok(slf.into_py(py)) } - + pub fn show_config(&self) { println!("Ballista Config:"); for ele in self.0.iter() { println!(" {}: {}", ele.0, ele.1) } } - + pub fn build(slf: PyRef<'_, Self>) -> PyBallista { PyBallista { - conf: PyBallistaBuilder(slf.0.clone()) + conf: PyBallistaBuilder(slf.0.clone()), } - } + } } #[pyclass(name = "Ballista", module = "ballista", subclass)] @@ -93,7 +98,7 @@ impl PyBallista { conf: PyBallistaBuilder::new(), } } - + pub fn show_config(&self) { println!("Ballista Config:"); for ele in self.conf.0.clone() { @@ -108,16 +113,20 @@ impl PyBallista { py: Python, ) -> PyResult { // Build the config - let config = BallistaConfig::with_settings(self.conf.0).unwrap(); + let config = BallistaConfig::with_settings(self.conf.0.clone()).unwrap(); // Define the SessionContext let session_context = BallistaContext::standalone(&config, concurrent_tasks); // SessionContext is an async function - let ctx = wait_for_future(py, session_context).unwrap(); + let ctx = wait_for_future(py, session_context) + .unwrap() + .context() + .clone(); // Convert the SessionContext into a Python SessionContext - Ok(ctx.context().into()) + Ok(ctx.into()) } + /* /// Construct the remote instance from the SessionContext pub fn remote(url: &str, py: Python) -> PyResult { let session_context = SessionContext::remote(url); @@ -126,9 +135,9 @@ impl PyBallista { // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } + */ } - /* Plan to implement Session Config and State in a future issue @@ -171,12 +180,12 @@ impl PySessionStateBuilder { pub fn with_config(&mut self, config: PySessionConfig) -> PySessionStateBuilder { let state = self.state.take().with_config(config.session_config); - + PySessionStateBuilder { state: state.into() } } - + pub fn build(&mut self) -> PySessionStateBuilder { PySessionStateBuilder { state: RefCell::new(self.state.take()) From 16e6c67ff696ca8aa8f16f995f56f813eb90939b Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 15 Nov 2024 14:19:12 -0800 Subject: [PATCH 24/33] Adding config to ballista context --- python/examples/example.py | 47 +++++++---------------------------- python/src/lib.rs | 50 ++++++++++++++++++++++---------------- 2 files changed, 38 insertions(+), 59 deletions(-) diff --git a/python/examples/example.py b/python/examples/example.py index dcaa3a087..63962352e 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -19,50 +19,21 @@ from datafusion.context import SessionContext # Ballista will initiate with an empty config +# set config variables with `set()` ballista = BallistaBuilder()\ .set("ballista.job.name", "example ballista")\ - .set("ballista.shuffle.partitions", "4")\ + .set("ballista.shuffle.partitions", "16")\ .set("ballista.executor.cpus", "4")\ .build() -print(ballista) +# Show the Ballista Config print(ballista.show_config()) -""" -# Create the context -ctx: SessionContext = Ballista().standalone() +# Build a standalone Cluster (use `remote()`) +# for remote cluster +ctx: SessionContext = ballista.standalone() +#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) +# Select 1 to verify its working ctx.sql("SELECT 1").show() - - -# Define custom settings -job_settings = { - "BALLISTA_JOB_NAME": "Example Ballista Job", - "DEFAULT_SHUFFLE_PARTITIONS": "2" -} - -ballista.configuration(job_settings) - -# But you can also set your own config -print("New Ballista Config: ", ballista.settings()) - -# Or you can check default settings in BallistaConfig -print("Default Shuffle Partitions: ", ballista.default_shuffle_partitions()) -# Create the Ballista Context [standalone or remote] -ctx: SessionContext = ballista.standalone() # Ballista.remote() - -# Register our parquet file to perform SQL operations -ctx.register_parquet("test_parquet", "./testdata/test.parquet") - -# Select the data from our test parquet file -test_parquet = ctx.sql("SELECT * FROM test_parquet") - -# Show our test parquet data -print(test_parquet.show()) - -# To perform dataframe operations, read in data -test_csv = ctx.read_csv("./testdata/test.csv", has_header=False) - -# Show the dataframe -test_csv.show() -""" +#ctx_remote.sql("SELECT 2").show() \ No newline at end of file diff --git a/python/src/lib.rs b/python/src/lib.rs index c00ae8961..7b623456f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,21 +15,15 @@ // specific language governing permissions and limitations // under the License. -use ballista::extension::SessionConfigExt; use ballista::prelude::*; -use ballista_core::utils::SessionStateExt; -use datafusion::catalog::Session; -use datafusion::execution::{SessionState, SessionStateBuilder}; -use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; use std::collections::HashMap; -use std::fmt::Formatter; -use std::path::Display; use pyo3::prelude::*; mod utils; +use utils::to_pyerr; #[pymodule] fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { @@ -51,13 +45,15 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { // Ballista Builder will take a HasMap/Dict Cionfg #[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] -pub struct PyBallistaBuilder(HashMap); +pub struct PyBallistaBuilder { + conf: HashMap, +} #[pymethods] impl PyBallistaBuilder { #[new] pub fn new() -> Self { - Self(HashMap::new()) + Self { conf: HashMap::new() } } pub fn set( @@ -66,21 +62,23 @@ impl PyBallistaBuilder { v: &str, py: Python, ) -> PyResult { - slf.0.insert(k.into(), v.into()); + slf.conf.insert(k.into(), v.into()); Ok(slf.into_py(py)) } pub fn show_config(&self) { println!("Ballista Config:"); - for ele in self.0.iter() { - println!(" {}: {}", ele.0, ele.1) + for ele in self.conf.iter() { + println!("\t{}: {}", ele.0, ele.1) } } pub fn build(slf: PyRef<'_, Self>) -> PyBallista { PyBallista { - conf: PyBallistaBuilder(slf.0.clone()), + conf: PyBallistaBuilder { + conf: slf.conf.clone(), + }, } } } @@ -101,24 +99,25 @@ impl PyBallista { pub fn show_config(&self) { println!("Ballista Config:"); - for ele in self.conf.0.clone() { + for ele in self.conf.conf.clone() { println!("{:4}: {}", ele.0, ele.1) } } /// Construct the standalone instance from the SessionContext + #[pyo3(signature = (concurrent_tasks = 4))] pub fn standalone( &self, concurrent_tasks: usize, py: Python, ) -> PyResult { // Build the config - let config = BallistaConfig::with_settings(self.conf.0.clone()).unwrap(); + let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); // Define the SessionContext let session_context = BallistaContext::standalone(&config, concurrent_tasks); // SessionContext is an async function let ctx = wait_for_future(py, session_context) - .unwrap() + .map_err(to_pyerr)? .context() .clone(); @@ -126,16 +125,25 @@ impl PyBallista { Ok(ctx.into()) } - /* /// Construct the remote instance from the SessionContext - pub fn remote(url: &str, py: Python) -> PyResult { - let session_context = SessionContext::remote(url); - let ctx = wait_for_future(py, session_context)?; + pub fn remote( + &self, + host: &str, + port: u16, + py: Python, + ) -> PyResult { + // Build the config + let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); + // Create the BallistaContext + let session_context = BallistaContext::remote(host, port, config); + let ctx = wait_for_future(py, session_context) + .map_err(to_pyerr)? + .context() + .clone(); // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } - */ } /* From 23b1957f7874295031055dd1bb62ed5f92354df2 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 15 Nov 2024 14:25:31 -0800 Subject: [PATCH 25/33] Adding config to ballista context --- docs/source/user-guide/python.md | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index 10eaa3e08..e47b2f90e 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -31,12 +31,22 @@ The following code demonstrates how to create a Ballista context and connect to If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `Ballista.remote("http://my-remote-ip:50050")`. ```text ->>> from ballista import Ballista +>>> from ballista import Ballista, BallistaBuilder >>> # for a standalone instance ->>> ctx = Ballista.standalone() +>>> # Ballista will initiate with an empty config +>>> # set config variables with `set()` +>>> ballista = BallistaBuilder()\ +>>> .set("ballista.job.name", "example ballista")\ +>>> .set("ballista.shuffle.partitions", "16")\ +>>> .set("ballista.executor.cpus", "4")\ +>>> .build() +>>> +>>> # Show the Ballista Config +>>> print(ballista.show_config()) +>>> ctx = ballista.standalone() >>> >>> # for a remote instance provide the URL ->>> ctx = Ballista.remote("http://url-path-to-scheduler:50050") +>>> ctx = ballista.remote("df://url-path-to-scheduler:50050") ``` ## SQL @@ -109,7 +119,7 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -from ballista import Ballista +from ballista import Ballista, BallistaBuilder import pyarrow # an alias From 108736da1efb23a75e70031400fd95feffa413c9 Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 15 Nov 2024 16:52:55 -0800 Subject: [PATCH 26/33] Adding config to ballista context --- python/ballista/__init__.py | 2 - python/ballista/context.py | 49 +------------ python/examples/example.py | 18 ++--- python/src/lib.rs | 140 ++++++++---------------------------- 4 files changed, 38 insertions(+), 171 deletions(-) diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index 99486ae55..70a507605 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -26,7 +26,6 @@ import pyarrow as pa from .ballista_internal import ( - Ballista, BallistaBuilder, #SessionConfig, #SessionStateBuilder, @@ -36,7 +35,6 @@ __version__ = importlib_metadata.version(__name__) __all__ = [ - "Ballista", "BallistaBuilder", #"SessionConfig", #"SessionStateBuilder", diff --git a/python/ballista/context.py b/python/ballista/context.py index 314fefd50..68e54a51e 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -17,7 +17,7 @@ from _typeshed import Self from datafusion import SessionContext -from ballista import Ballista, BallistaBuilder +from ballista import BallistaBuilder from typing import List, Any @@ -29,51 +29,8 @@ def __new__(cls): def __init__(self) -> None: self = {} - def set(self, k, v) -> Self: - return self.set(k, v) - - -class Ballista: - def __new__(cls): - return super().__new__(cls) - - def __init__(self) -> None: - self.state = Ballista() + def config(self, k, v) -> Self: + return self.config(k, v) def standalone(self, concurrent_tasks: int = 4): self.standalone() - - def builder(self) -> BallistaBuilder: - return BallistaBuilder() - -""" -### Future State Implementation -class SessionState: - def __new__(cls): - return super().__new__(cls) - -class SessionConfig: - def __new__(cls): - return super().__new__(cls) - - def __init__(self): - self.session_config = SessionConfig() - - def set_str(self, key: str, value: str): - self.session_config.set_str(key, value) - -class SessionStateBuilder: - def __new__(cls): - return super().__new__(cls) - - def __init__(self) -> None: - self.state = SessionStateBuilder() - - def with_config(self, config: SessionConfig) -> SessionStateBuilder: - self.with_config(config) - - return self - - def build(self) -> SessionState: - self.build() -""" \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 63962352e..a67c48e9d 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,23 +15,17 @@ # specific language governing permissions and limitations # under the License. -from ballista import Ballista, BallistaBuilder +from ballista import BallistaBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config # set config variables with `set()` -ballista = BallistaBuilder()\ - .set("ballista.job.name", "example ballista")\ - .set("ballista.shuffle.partitions", "16")\ - .set("ballista.executor.cpus", "4")\ - .build() +ctx: SessionContext = BallistaBuilder()\ + .config("ballista.job.name", "example ballista")\ + .config("ballista.shuffle.partitions", "16")\ + .config("ballista.executor.cpus", "4")\ + .remote("http://10.103.0.25:50050") -# Show the Ballista Config -print(ballista.show_config()) - -# Build a standalone Cluster (use `remote()`) -# for remote cluster -ctx: SessionContext = ballista.standalone() #ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) # Select 1 to verify its working diff --git a/python/src/lib.rs b/python/src/lib.rs index 7b623456f..dfc20e19c 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -16,6 +16,8 @@ // under the License. use ballista::prelude::*; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; @@ -29,7 +31,6 @@ use utils::to_pyerr; fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs - m.add_class::()?; m.add_class::()?; // DataFusion structs m.add_class::()?; @@ -53,10 +54,12 @@ pub struct PyBallistaBuilder { impl PyBallistaBuilder { #[new] pub fn new() -> Self { - Self { conf: HashMap::new() } + Self { + conf: HashMap::new(), + } } - pub fn set( + pub fn config( mut slf: PyRefMut<'_, Self>, k: &str, v: &str, @@ -74,52 +77,20 @@ impl PyBallistaBuilder { } } - pub fn build(slf: PyRef<'_, Self>) -> PyBallista { - PyBallista { - conf: PyBallistaBuilder { - conf: slf.conf.clone(), - }, - } - } -} - -#[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista { - pub conf: PyBallistaBuilder, -} - -#[pymethods] -impl PyBallista { - #[new] - pub fn new() -> Self { - Self { - conf: PyBallistaBuilder::new(), - } - } - - pub fn show_config(&self) { - println!("Ballista Config:"); - for ele in self.conf.conf.clone() { - println!("{:4}: {}", ele.0, ele.1) - } - } - /// Construct the standalone instance from the SessionContext - #[pyo3(signature = (concurrent_tasks = 4))] - pub fn standalone( - &self, - concurrent_tasks: usize, - py: Python, - ) -> PyResult { + pub fn standalone(&self, py: Python) -> PyResult { // Build the config - let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); - // Define the SessionContext - let session_context = BallistaContext::standalone(&config, concurrent_tasks); + let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + // Build the state + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + // Build the context + let standalone_session = SessionContext::standalone_with_state(state); + // SessionContext is an async function - let ctx = wait_for_future(py, session_context) - .map_err(to_pyerr)? - .context() - .clone(); + let ctx = wait_for_future(py, standalone_session)?; // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) @@ -128,76 +99,23 @@ impl PyBallista { /// Construct the remote instance from the SessionContext pub fn remote( &self, - host: &str, - port: u16, + url: &str, py: Python, ) -> PyResult { // Build the config - let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); - // Create the BallistaContext - let session_context = BallistaContext::remote(host, port, config); - let ctx = wait_for_future(py, session_context) - .map_err(to_pyerr)? - .context() - .clone(); + let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + // Build the state + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + // Build the context + let remote_session = SessionContext::remote_with_state(url, state); + + // SessionContext is an async function + let ctx = wait_for_future(py, remote_session)?; // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } } - -/* -Plan to implement Session Config and State in a future issue - -/// Ballista Session Extension builder -#[pyclass(name = "SessionConfig", module = "ballista", subclass)] -#[derive(Clone)] -pub struct PySessionConfig { - pub session_config: SessionConfig, -} - -#[pymethods] -impl PySessionConfig { - #[new] - pub fn new() -> Self { - let session_config = SessionConfig::new_with_ballista(); - - Self { session_config } - } - - pub fn set_str(&mut self, key: &str, value: &str) -> Self { - self.session_config.options_mut().set(key, value); - - self.clone() - } -} - -#[pyclass(name = "SessionStateBuilder", module = "ballista", subclass)] -pub struct PySessionStateBuilder { - pub state: RefCell, -} - -#[pymethods] -impl PySessionStateBuilder { - #[new] - pub fn new() -> Self { - Self { - state: RefCell::new(SessionStateBuilder::new()), - } - } - - pub fn with_config(&mut self, config: PySessionConfig) -> PySessionStateBuilder { - let state = self.state.take().with_config(config.session_config); - - PySessionStateBuilder { - state: state.into() - } - } - - pub fn build(&mut self) -> PySessionStateBuilder { - PySessionStateBuilder { - state: RefCell::new(self.state.take()) - } - } -} -*/ From dd1f8c9f6042770b183f137dcf5df314a2949cac Mon Sep 17 00:00:00 2001 From: Trevor Barnes Date: Fri, 15 Nov 2024 16:54:59 -0800 Subject: [PATCH 27/33] Adding config to ballista context --- python/examples/example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/examples/example.py b/python/examples/example.py index a67c48e9d..e09690553 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -24,7 +24,7 @@ .config("ballista.job.name", "example ballista")\ .config("ballista.shuffle.partitions", "16")\ .config("ballista.executor.cpus", "4")\ - .remote("http://10.103.0.25:50050") + .standalone() #ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) From 4f08364eddceccae52c32cac869becfd699074b3 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sun, 17 Nov 2024 19:57:57 -0800 Subject: [PATCH 28/33] Updated Builder and Docs --- docs/source/user-guide/python.md | 13 ++++--------- python/README.md | 4 ++-- python/ballista/__init__.py | 6 ------ python/ballista/tests/test_context.py | 21 +++++++++++---------- python/examples/example.py | 3 +-- python/src/lib.rs | 24 +++++------------------- 6 files changed, 23 insertions(+), 48 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index e47b2f90e..d9ef39995 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -31,18 +31,13 @@ The following code demonstrates how to create a Ballista context and connect to If you are running a standalone cluster (runs locally), all you need to do is call the stand alone cluster method `standalone()` or your BallistaContext. If you are running a cluster in remote mode, you need to provide the URL `Ballista.remote("http://my-remote-ip:50050")`. ```text ->>> from ballista import Ballista, BallistaBuilder +>>> from ballista import BallistaBuilder >>> # for a standalone instance >>> # Ballista will initiate with an empty config >>> # set config variables with `set()` >>> ballista = BallistaBuilder()\ ->>> .set("ballista.job.name", "example ballista")\ ->>> .set("ballista.shuffle.partitions", "16")\ ->>> .set("ballista.executor.cpus", "4")\ ->>> .build() +>>> .config("ballista.job.name", "example ballista") >>> ->>> # Show the Ballista Config ->>> print(ballista.show_config()) >>> ctx = ballista.standalone() >>> >>> # for a remote instance provide the URL @@ -119,7 +114,7 @@ The `explain` method can be used to show the logical and physical query plans fo The following example demonstrates creating arrays with PyArrow and then creating a Ballista DataFrame. ```python -from ballista import Ballista, BallistaBuilder +from ballista import BallistaBuilder import pyarrow # an alias @@ -127,7 +122,7 @@ import pyarrow f = ballista.functions # create a context -ctx = Ballista.standalone() +ctx = Ballista().standalone() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( diff --git a/python/README.md b/python/README.md index c9f92eaa0..01b0a7f90 100644 --- a/python/README.md +++ b/python/README.md @@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta Creates a new context and connects to a Ballista scheduler process. ```python -from ballista import Ballista ->>> ctx = Ballista.standalone() +from ballista import BallistaBuilder +>>> ctx = BallistaBuilder().standalone() ``` ## Example SQL Usage diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index 70a507605..a143f17e9 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -27,16 +27,10 @@ from .ballista_internal import ( BallistaBuilder, - #SessionConfig, - #SessionStateBuilder, - #SessionState ) __version__ = importlib_metadata.version(__name__) __all__ = [ "BallistaBuilder", - #"SessionConfig", - #"SessionStateBuilder", - #"SessionState" ] \ No newline at end of file diff --git a/python/ballista/tests/test_context.py b/python/ballista/tests/test_context.py index 97d90e42b..a0af9592b 100644 --- a/python/ballista/tests/test_context.py +++ b/python/ballista/tests/test_context.py @@ -15,27 +15,27 @@ # specific language governing permissions and limitations # under the License. -from ballista import SessionContext +from ballista import BallistaBuilder import pytest def test_create_context(): - SessionContext("localhost", 50050) + BallistaBuilder().standalone() def test_select_one(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.sql("SELECT 1") batches = df.collect() assert len(batches) == 1 def test_read_csv(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 1 def test_register_csv(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() ctx.register_csv("test", "testdata/test.csv", has_header=True) df = ctx.sql("SELECT * FROM test") batches = df.collect() @@ -43,14 +43,14 @@ def test_register_csv(): assert len(batches[0]) == 1 def test_read_parquet(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_parquet("testdata/test.parquet") batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 8 def test_register_parquet(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() ctx.register_parquet("test", "testdata/test.parquet") df = ctx.sql("SELECT * FROM test") batches = df.collect() @@ -58,7 +58,7 @@ def test_register_parquet(): assert len(batches[0]) == 8 def test_read_dataframe_api(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) \ .select_columns('a', 'b') \ .limit(1) @@ -67,11 +67,12 @@ def test_read_dataframe_api(): assert len(batches[0]) == 1 def test_execute_plan(): - ctx = SessionContext("localhost", 50050) + ctx = BallistaBuilder().standalone() df = ctx.read_csv("testdata/test.csv", has_header=True) \ .select_columns('a', 'b') \ .limit(1) - df = ctx.execute_logical_plan(df.logical_plan()) + # TODO research SessionContext Logical Plan for DataFusionPython + #df = ctx.execute_logical_plan(df.logical_plan()) batches = df.collect() assert len(batches) == 1 assert len(batches[0]) == 1 diff --git a/python/examples/example.py b/python/examples/example.py index e09690553..61a9abbd2 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -19,11 +19,10 @@ from datafusion.context import SessionContext # Ballista will initiate with an empty config -# set config variables with `set()` +# set config variables with `config` ctx: SessionContext = BallistaBuilder()\ .config("ballista.job.name", "example ballista")\ .config("ballista.shuffle.partitions", "16")\ - .config("ballista.executor.cpus", "4")\ .standalone() #ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) diff --git a/python/src/lib.rs b/python/src/lib.rs index dfc20e19c..219eb5769 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -18,7 +18,7 @@ use ballista::prelude::*; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; -use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; +use datafusion_python::context::PySessionContext; use datafusion_python::utils::wait_for_future; use std::collections::HashMap; @@ -30,17 +30,10 @@ use utils::to_pyerr; #[pymodule] fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); - // Ballista structs + // BallistaBuilder struct m.add_class::()?; - // DataFusion structs + // DataFusion struct m.add_class::()?; - // Ballista Config - /* - // Future implementation will include more state and config options - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; - */ Ok(()) } @@ -70,15 +63,8 @@ impl PyBallistaBuilder { Ok(slf.into_py(py)) } - pub fn show_config(&self) { - println!("Ballista Config:"); - for ele in self.conf.iter() { - println!("\t{}: {}", ele.0, ele.1) - } - } - /// Construct the standalone instance from the SessionContext - pub fn standalone(&self, py: Python) -> PyResult { + pub fn standalone(&self, py: Python) -> PyResult { // Build the config let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; // Build the state @@ -101,7 +87,7 @@ impl PyBallistaBuilder { &self, url: &str, py: Python, - ) -> PyResult { + ) -> PyResult { // Build the config let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; // Build the state From 1449e832572b4638f0bd15b6b952a3568367be7f Mon Sep 17 00:00:00 2001 From: tbar4 Date: Sun, 17 Nov 2024 20:03:06 -0800 Subject: [PATCH 29/33] Updated Builder and Docs --- python/ballista/context.py | 36 ------------------------------------ 1 file changed, 36 deletions(-) delete mode 100644 python/ballista/context.py diff --git a/python/ballista/context.py b/python/ballista/context.py deleted file mode 100644 index 68e54a51e..000000000 --- a/python/ballista/context.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from _typeshed import Self -from datafusion import SessionContext -from ballista import BallistaBuilder - -from typing import List, Any - - -class BallistaBuilder: - def __new__(cls): - return super().__new__(cls) - - def __init__(self) -> None: - self = {} - - def config(self, k, v) -> Self: - return self.config(k, v) - - def standalone(self, concurrent_tasks: int = 4): - self.standalone() From 915cd2418edc484313e3a6319b026c89f6ab8e49 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Mon, 18 Nov 2024 20:48:17 -0800 Subject: [PATCH 30/33] Updated Builder and Docs --- python/src/context.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 python/src/context.rs diff --git a/python/src/context.rs b/python/src/context.rs new file mode 100644 index 000000000..6e778edd7 --- /dev/null +++ b/python/src/context.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. \ No newline at end of file From 4b85bfc43a4f5769eefe169011e9b80171daa6fb Mon Sep 17 00:00:00 2001 From: tbar4 Date: Mon, 18 Nov 2024 20:50:52 -0800 Subject: [PATCH 31/33] Updated Builder and Docs --- python/src/context.rs | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 python/src/context.rs diff --git a/python/src/context.rs b/python/src/context.rs deleted file mode 100644 index 6e778edd7..000000000 --- a/python/src/context.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. \ No newline at end of file From 0887363a58f4ebd0a32918272cdc13d04ccc6498 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Mon, 18 Nov 2024 20:53:26 -0800 Subject: [PATCH 32/33] Updated Builder and Docs --- python/src/lib.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 219eb5769..41b4b6d31 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -83,11 +83,7 @@ impl PyBallistaBuilder { } /// Construct the remote instance from the SessionContext - pub fn remote( - &self, - url: &str, - py: Python, - ) -> PyResult { + pub fn remote(&self, url: &str, py: Python) -> PyResult { // Build the config let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; // Build the state From b979a37d177d4f57354273662017d81d3764a7a5 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Mon, 18 Nov 2024 20:56:48 -0800 Subject: [PATCH 33/33] Updated Builder and Docs --- docs/source/user-guide/python.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md index d9ef39995..f17ac68d4 100644 --- a/docs/source/user-guide/python.md +++ b/docs/source/user-guide/python.md @@ -34,10 +34,10 @@ If you are running a standalone cluster (runs locally), all you need to do is ca >>> from ballista import BallistaBuilder >>> # for a standalone instance >>> # Ballista will initiate with an empty config ->>> # set config variables with `set()` +>>> # set config variables with `config()` >>> ballista = BallistaBuilder()\ >>> .config("ballista.job.name", "example ballista") ->>> +>>> >>> ctx = ballista.standalone() >>> >>> # for a remote instance provide the URL