diff --git a/.github/workflows/build-kernel-wheels.yml b/.github/workflows/build-kernel-wheels.yml index fc7bf6e13..038b82c0b 100644 --- a/.github/workflows/build-kernel-wheels.yml +++ b/.github/workflows/build-kernel-wheels.yml @@ -70,7 +70,7 @@ jobs: shell: bash - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: wheels path: python/delta-kernel-rust-sharing-wrapper/target/wheels/*.whl diff --git a/python/delta-kernel-rust-sharing-wrapper/.gitignore b/python/delta-kernel-rust-sharing-wrapper/.gitignore new file mode 100644 index 000000000..03314f77b --- /dev/null +++ b/python/delta-kernel-rust-sharing-wrapper/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml index ae48fd0fb..74e9d5c31 100644 --- a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml +++ b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "53.3.0", features = ["pyarrow"] } -delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]} +delta_kernel = { git = "https://github.com/delta-io/delta-kernel-rs.git", rev = "702e12f", features = ["cloud", "default-engine"]} openssl = { version = "0.10", features = ["vendored"] } url = "2" diff --git a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs index 94a8dd23e..4d5fef7fb 100644 --- a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs +++ b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs @@ -1,20 +1,22 @@ use std::sync::Arc; use arrow::compute::filter_record_batch; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; use arrow::pyarrow::PyArrowType; -use delta_kernel::engine::arrow_data::ArrowEngineData; +use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; + use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::scan::ScanResult; +use delta_kernel::{engine::arrow_data::ArrowEngineData, schema::StructType}; +use delta_kernel::{DeltaResult, Engine}; + use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use url::Url; -use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; -use delta_kernel::Engine; - use std::collections::HashMap; struct KernelError(delta_kernel::Error); @@ -73,11 +75,45 @@ impl ScanBuilder { } fn build(&mut self) -> DeltaPyResult { - let scan = self.0.take().unwrap().build()?; + let scan = self + .0 + .take() + .ok_or_else(|| { + delta_kernel::Error::generic("Can only call build() once on ScanBuilder") + })? + .build()?; Ok(Scan(scan)) } } +fn try_get_schema(schema: &Arc) -> Result { + Ok(Arc::new(schema.as_ref().try_into().map_err(|e| { + delta_kernel::Error::Generic(format!("Could not get result schema: {e}")) + })?)) +} + +fn try_create_record_batch_iter( + results: impl Iterator>, + result_schema: ArrowSchemaRef, +) -> RecordBatchIterator>> { + let record_batches = results.map(|res| { + let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); + let (mask, data) = scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))? + .into(); + if let Some(mask) = mask { + let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; + Ok(filtered_batch) + } else { + Ok(record_batch) + } + }); + RecordBatchIterator::new(record_batches, result_schema) +} + #[pyclass] struct Scan(delta_kernel::scan::Scan); @@ -87,38 +123,67 @@ impl Scan { &self, engine_interface: &PythonInterface, ) -> DeltaPyResult>> { - let result_schema: SchemaRef = - Arc::new(self.0.schema().as_ref().try_into().map_err(|e| { - delta_kernel::Error::Generic(format!("Could not get result schema: {e}")) - })?); - let results = self.0.execute(engine_interface.0.as_ref())?; - let record_batches: Vec<_> = results - .map(|res| { - let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); - let (mask, data) = - scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| { - ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()) - })? - .into(); - if let Some(mask) = mask { - let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; - Ok(filtered_batch) - } else { - Ok(record_batch) - } - }) - .collect(); - let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema); + let result_schema: ArrowSchemaRef = try_get_schema(self.0.schema())?; + let results = self.0.execute(engine_interface.0.clone())?; + let record_batch_iter = try_create_record_batch_iter(results, result_schema); + Ok(PyArrowType(Box::new(record_batch_iter))) + } +} + +#[pyclass] +struct TableChangesScanBuilder(Option); + +#[pymethods] +impl TableChangesScanBuilder { + #[new] + #[pyo3(signature = (table, engine_interface, start_version, end_version=None))] + fn new( + table: &Table, + engine_interface: &PythonInterface, + start_version: u64, + end_version: Option, + ) -> DeltaPyResult { + let table_changes = + table + .0 + .table_changes(engine_interface.0.as_ref(), start_version, end_version)?; + Ok(TableChangesScanBuilder(Some( + table_changes.into_scan_builder(), + ))) + } + + fn build(&mut self) -> DeltaPyResult { + let scan = self + .0 + .take() + .ok_or_else(|| { + delta_kernel::Error::generic( + "Can only call build() once on TableChangesScanBuilder", + ) + })? + .build()?; + Ok(TableChangesScan(scan)) + } +} + +#[pyclass] +struct TableChangesScan(delta_kernel::table_changes::scan::TableChangesScan); + +#[pymethods] +impl TableChangesScan { + fn execute( + &self, + engine_interface: &PythonInterface, + ) -> DeltaPyResult>> { + let result_schema: ArrowSchemaRef = try_get_schema(self.0.schema())?; + let results = self.0.execute(engine_interface.0.clone())?; + let record_batch_iter = try_create_record_batch_iter(results, result_schema); Ok(PyArrowType(Box::new(record_batch_iter))) } } #[pyclass] -struct PythonInterface(Box); +struct PythonInterface(Arc); #[pymethods] impl PythonInterface { @@ -130,7 +195,7 @@ impl PythonInterface { HashMap::::new(), Arc::new(TokioBackgroundExecutor::new()), )?; - Ok(PythonInterface(Box::new(client))) + Ok(PythonInterface(Arc::new(client))) } } @@ -144,5 +209,7 @@ fn delta_kernel_rust_sharing_wrapper(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) }