diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a8a24dd07..14e6423b8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -37,7 +37,7 @@ jobs: cargo install cargo-msrv --locked - name: verify-msrv run: | - cargo msrv --path kernel/ verify --all-features + cargo msrv --path kernel/ verify --features $(cat .github/workflows/default-kernel-features) cargo msrv --path derive-macros/ verify --all-features cargo msrv --path ffi/ verify --all-features cargo msrv --path ffi-proc-macros/ verify --all-features @@ -104,7 +104,7 @@ jobs: - name: check kernel builds with no-default-features run: cargo build -p delta_kernel --no-default-features - name: build and lint with clippy - run: cargo clippy --benches --tests --all-features -- -D warnings + run: cargo clippy --benches --tests --features $(cat .github/workflows/default-kernel-features) -- -D warnings - name: lint without default features run: cargo clippy --no-default-features -- -D warnings - name: check kernel builds with default-engine @@ -129,7 +129,7 @@ jobs: override: true - uses: Swatinem/rust-cache@v2 - name: test - run: cargo test --workspace --verbose --all-features -- --skip read_table_version_hdfs + run: cargo test --workspace --verbose --features $(cat .github/workflows/default-kernel-features) -- --skip read_table_version_hdfs ffi_test: runs-on: ${{ matrix.os }} @@ -229,7 +229,7 @@ jobs: uses: taiki-e/install-action@cargo-llvm-cov - uses: Swatinem/rust-cache@v2 - name: Generate code coverage - run: cargo llvm-cov --all-features --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs + run: cargo llvm-cov --features $(cat .github/workflows/default-kernel-features) --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 with: diff --git a/.github/workflows/default-kernel-features b/.github/workflows/default-kernel-features new file mode 100644 index 000000000..bee74feef --- /dev/null +++ b/.github/workflows/default-kernel-features @@ -0,0 +1 @@ +integration-test,default-engine,default-engine-rustls,cloud,arrow,sync-engine diff --git a/Cargo.toml b/Cargo.toml index ec7993736..aec38fc78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,21 +23,6 @@ rust-version = "1.80" version = "0.6.1" [workspace.dependencies] -# When changing the arrow version range, also modify ffi/Cargo.toml which has -# its own arrow version ranges witeh modified features. Failure to do so will -# result in compilation errors as two different sets of arrow dependencies may -# be sourced -arrow = { version = ">=53, <55" } -arrow-arith = { version = ">=53, <55" } -arrow-array = { version = ">=53, <55" } -arrow-buffer = { version = ">=53, <55" } -arrow-cast = { version = ">=53, <55" } -arrow-data = { version = ">=53, <55" } -arrow-ord = { version = ">=53, <55" } -arrow-json = { version = ">=53, <55" } -arrow-select = { version = ">=53, <55" } -arrow-schema = { version = ">=53, <55" } -parquet = { version = ">=53, <55", features = ["object_store"] } object_store = { version = ">=0.11, <0.12" } hdfs-native-object-store = "0.12.0" hdfs-native = "0.10.0" diff --git a/acceptance/Cargo.toml b/acceptance/Cargo.toml index 2854c7c39..e844007ef 100644 --- a/acceptance/Cargo.toml +++ b/acceptance/Cargo.toml @@ -14,19 +14,14 @@ rust-version.workspace = true release = false [dependencies] -arrow-array = { workspace = true } -arrow-cast = { workspace = true } -arrow-ord = { workspace = true } -arrow-select = { workspace = true } -arrow-schema = { workspace = true } delta_kernel = { path = "../kernel", features = [ "default-engine", + "arrow_53", "developer-visibility", ] } futures = "0.3" itertools = "0.13" object_store = { workspace = true } -parquet = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1" diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index c515d50c9..b045634b5 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -1,15 +1,18 @@ use std::{path::Path, sync::Arc}; -use arrow_array::{Array, RecordBatch}; -use arrow_ord::sort::{lexsort_to_indices, SortColumn}; -use arrow_schema::{DataType, Schema}; -use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::take}; +use delta_kernel::arrow::array::{Array, RecordBatch}; +use delta_kernel::arrow::compute::{ + concat_batches, filter_record_batch, lexsort_to_indices, take, SortColumn, +}; +use delta_kernel::arrow::datatypes::{DataType, Schema}; +use delta_kernel::parquet::arrow::async_reader::{ + ParquetObjectReader, ParquetRecordBatchStreamBuilder, +}; use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Engine, Error, Table}; use futures::{stream::TryStreamExt, StreamExt}; use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use crate::{TestCaseInfo, TestResult}; @@ -83,8 +86,8 @@ fn assert_schema_fields_match(schema: &Schema, golden: &Schema) { fn normalize_col(col: Arc) -> Arc { if let DataType::Timestamp(unit, Some(zone)) = col.data_type() { if **zone == *"+00:00" { - arrow_cast::cast::cast(&col, &DataType::Timestamp(*unit, Some("UTC".into()))) - .expect("Could not cast to UTC") + let data_type = DataType::Timestamp(*unit, Some("UTC".into())); + delta_kernel::arrow::compute::cast(&col, &data_type).expect("Could not cast to UTC") } else { col } diff --git a/feature-tests/Cargo.toml b/feature-tests/Cargo.toml index 7e45e41e2..43f3773a7 100644 --- a/feature-tests/Cargo.toml +++ b/feature-tests/Cargo.toml @@ -12,7 +12,7 @@ version.workspace = true release = false [dependencies] -delta_kernel = { path = "../kernel" } +delta_kernel = { path = "../kernel", features = ["arrow_53"] } [features] default-engine = [ "delta_kernel/default-engine" ] diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index aa4edc167..d588427b0 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -22,21 +22,13 @@ tracing-core = { version = "0.1", optional = true } tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] } url = "2" delta_kernel = { path = "../kernel", default-features = false, features = [ + "arrow", "developer-visibility", ] } delta_kernel_ffi_macros = { path = "../ffi-proc-macros", version = "0.6.1" } -# used if we use the default engine to be able to move arrow data into the c-ffi format -arrow-schema = { version = ">=53, <55", default-features = false, features = [ - "ffi", -], optional = true } -arrow-data = { version = ">=53, <55", default-features = false, features = [ - "ffi", -], optional = true } -arrow-array = { version = ">=53, <55", default-features = false, optional = true } - [build-dependencies] -cbindgen = "0.27.0" +cbindgen = "0.28" libc = "0.2.158" [dev-dependencies] @@ -52,9 +44,6 @@ default = ["default-engine"] cloud = ["delta_kernel/cloud"] default-engine = [ "delta_kernel/default-engine", - "arrow-array", - "arrow-data", - "arrow-schema", ] tracing = [ "tracing-core", "tracing-subscriber" ] sync-engine = ["delta_kernel/sync-engine"] diff --git a/ffi/cbindgen.toml b/ffi/cbindgen.toml index 491333ac1..9f9fe9099 100644 --- a/ffi/cbindgen.toml +++ b/ffi/cbindgen.toml @@ -25,4 +25,4 @@ parse_deps = true # only crates found in this list will ever be parsed. # # default: there is no allow-list (NOTE: this is the opposite of []) -include = ["delta_kernel", "arrow-data", "arrow-schema"] +include = ["arrow", "arrow-data", "arrow-schema", "delta_kernel"] diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs index 3363c9034..01eaaa343 100644 --- a/ffi/src/engine_data.rs +++ b/ffi/src/engine_data.rs @@ -1,5 +1,9 @@ //! EngineData related ffi code +use delta_kernel::arrow::array::{ + ffi::{FFI_ArrowArray, FFI_ArrowSchema}, + ArrayData, StructArray, +}; use delta_kernel::{DeltaResult, EngineData}; use std::ffi::c_void; @@ -45,8 +49,8 @@ unsafe fn get_raw_engine_data_impl(data: &mut Handle) -> &m #[cfg(feature = "default-engine")] #[repr(C)] pub struct ArrowFFIData { - pub array: arrow_data::ffi::FFI_ArrowArray, - pub schema: arrow_schema::ffi::FFI_ArrowSchema, + pub array: FFI_ArrowArray, + pub schema: FFI_ArrowSchema, } // TODO: This should use a callback to avoid having to have the engine free the struct @@ -71,16 +75,16 @@ pub unsafe extern "C" fn get_raw_arrow_data( // TODO: This method leaks the returned pointer memory. How will the engine free it? #[cfg(feature = "default-engine")] fn get_raw_arrow_data_impl(data: Box) -> DeltaResult<*mut ArrowFFIData> { - let record_batch: arrow_array::RecordBatch = data + let record_batch: delta_kernel::arrow::array::RecordBatch = data .into_any() .downcast::() .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? .into(); - let sa: arrow_array::StructArray = record_batch.into(); - let array_data: arrow_data::ArrayData = sa.into(); + let sa: StructArray = record_batch.into(); + let array_data: ArrayData = sa.into(); // these call `clone`. is there a way to not copy anything and what exactly are they cloning? - let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data); - let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?; + let array = FFI_ArrowArray::new(&array_data); + let schema = FFI_ArrowSchema::try_from(array_data.data_type())?; let ret_data = Box::new(ArrowFFIData { array, schema }); Ok(Box::leak(ret_data)) } diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index cc0a5abd1..02e924260 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -6,19 +6,4 @@ edition = "2021" [workspace] [dependencies] -arrow = "=53.0.0" -delta_kernel = { path = "../kernel", features = ["arrow-conversion", "arrow-expression", "default-engine", "sync-engine"] } - -[patch.'file:///../kernel'] -arrow = "=53.0.0" -arrow-arith = "=53.0.0" -arrow-array = "=53.0.0" -arrow-buffer = "=53.0.0" -arrow-cast = "=53.0.0" -arrow-data = "=53.0.0" -arrow-ord = "=53.0.0" -arrow-json = "=53.0.0" -arrow-select = "=53.0.0" -arrow-schema = "=53.0.0" -parquet = "=53.0.0" -object_store = "=0.11.1" +delta_kernel = { path = "../kernel", features = ["default-engine", "sync-engine"] } diff --git a/integration-tests/src/main.rs b/integration-tests/src/main.rs index 55a809e8c..db26d0e4d 100644 --- a/integration-tests/src/main.rs +++ b/integration-tests/src/main.rs @@ -1,15 +1,16 @@ -fn create_arrow_schema() -> arrow::datatypes::Schema { - use arrow::datatypes::{DataType, Field, Schema}; +use delta_kernel::arrow::datatypes::{DataType, Field, Schema}; + +fn create_arrow_schema() -> Schema { let field_a = Field::new("a", DataType::Int64, false); let field_b = Field::new("b", DataType::Boolean, false); Schema::new(vec![field_a, field_b]) } fn create_kernel_schema() -> delta_kernel::schema::Schema { - use delta_kernel::schema::{DataType, Schema, StructField}; + use delta_kernel::schema::{DataType, StructField}; let field_a = StructField::not_null("a", DataType::LONG); let field_b = StructField::not_null("b", DataType::BOOLEAN); - Schema::new(vec![field_a, field_b]) + delta_kernel::schema::Schema::new(vec![field_a, field_b]) } fn main() { diff --git a/integration-tests/test-all-arrow-versions.sh b/integration-tests/test-all-arrow-versions.sh index 35c8fdc7d..13fa42618 100755 --- a/integration-tests/test-all-arrow-versions.sh +++ b/integration-tests/test-all-arrow-versions.sh @@ -2,38 +2,25 @@ set -eu -o pipefail -is_version_le() { - [ "$1" = "$(echo -e "$1\n$2" | sort -V | head -n1)" ] -} - -is_version_lt() { - if [ "$1" = "$2" ] - then - return 1 - else - is_version_le "$1" "$2" - fi -} - test_arrow_version() { ARROW_VERSION="$1" echo "== Testing version $ARROW_VERSION ==" - sed -i'' -e "s/\(arrow[^\"]*=[^\"]*\).*/\1\"=$ARROW_VERSION\"/" Cargo.toml - sed -i'' -e "s/\(parquet[^\"]*\).*/\1\"=$ARROW_VERSION\"/" Cargo.toml cargo clean rm -f Cargo.lock cargo update cat Cargo.toml - cargo run + cargo run --features ${ARROW_VERSION} } -MIN_ARROW_VER="53.0.0" -MAX_ARROW_VER="54.0.0" +FEATURES=$(cat ../kernel/Cargo.toml | grep -e ^arrow_ | awk '{ print $1 }' | sort -u) -for ARROW_VERSION in $(curl -s https://crates.io/api/v1/crates/arrow | jq -r '.versions[].num' | tr -d '\r') + +echo "[features]" >> Cargo.toml + +for ARROW_VERSION in ${FEATURES} do - if ! is_version_lt "$ARROW_VERSION" "$MIN_ARROW_VER" && is_version_lt "$ARROW_VERSION" "$MAX_ARROW_VER" - then - test_arrow_version "$ARROW_VERSION" - fi + echo "${ARROW_VERSION} = [\"delta_kernel/${ARROW_VERSION}\"]" >> Cargo.toml + test_arrow_version $ARROW_VERSION done + +git checkout Cargo.toml diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 1431b1ff1..01446e471 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -58,20 +58,22 @@ visibility = "0.1.1" # Used in the sync engine tempfile = { version = "3", optional = true } + +# Arrow supported versions +## 53 # Used in default engine -arrow-buffer = { workspace = true, optional = true } -arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] } -arrow-select = { workspace = true, optional = true } -arrow-arith = { workspace = true, optional = true } -arrow-cast = { workspace = true, optional = true } -arrow-json = { workspace = true, optional = true } -arrow-ord = { workspace = true, optional = true } -arrow-schema = { workspace = true, optional = true } +arrow_53 = { package = "arrow", version = "53", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true } +# Used in default and sync engine +parquet_53 = { package = "parquet", version = "53", features = ["async", "object_store"] , optional = true } +###### +## 54 +arrow_54 = { package = "arrow", version = "54", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true } +parquet_54 = { package = "parquet", version = "54", features = ["async", "object_store"] , optional = true } +###### + futures = { version = "0.3", optional = true } object_store = { workspace = true, optional = true } hdfs-native-object-store = { workspace = true, optional = true } -# Used in default and sync engine -parquet = { workspace = true, optional = true } # Used for fetching direct urls (like pre-signed urls) reqwest = { version = "0.12.8", default-features = false, optional = true } strum = { version = "0.26", features = ["derive"] } @@ -85,14 +87,16 @@ hdfs-native = { workspace = true, optional = true } walkdir = { workspace = true, optional = true } [features] -arrow-conversion = ["arrow-schema"] -arrow-expression = [ - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-ord", - "arrow-schema", -] +# The default version to be expected +arrow = ["arrow_53"] + +arrow_53 = ["dep:arrow_53", "dep:parquet_53"] + +arrow_54 = ["dep:arrow_54", "dep:parquet_54"] + +arrow-conversion = [] +arrow-expression = [] + cloud = [ "object_store/aws", "object_store/azure", @@ -107,16 +111,8 @@ default = [] default-engine-base = [ "arrow-conversion", "arrow-expression", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-json", - "arrow-schema", - "arrow-select", "futures", "object_store", - "parquet/async", - "parquet/object_store", "tokio", "uuid/v4", "uuid/fast-rng", @@ -134,13 +130,6 @@ default-engine-rustls = [ developer-visibility = [] sync-engine = [ - "arrow-cast", - "arrow-conversion", - "arrow-expression", - "arrow-array", - "arrow-json", - "arrow-select", - "parquet", "tempfile", ] integration-test = [ @@ -156,8 +145,7 @@ version = "=0.5.9" rustc_version = "0.4.1" [dev-dependencies] -arrow = { workspace = true, features = ["json", "prettyprint"] } -delta_kernel = { path = ".", features = ["default-engine", "sync-engine"] } +delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] } test_utils = { path = "../test-utils" } paste = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } diff --git a/kernel/examples/inspect-table/Cargo.toml b/kernel/examples/inspect-table/Cargo.toml index b81a8ac5b..4208c6938 100644 --- a/kernel/examples/inspect-table/Cargo.toml +++ b/kernel/examples/inspect-table/Cargo.toml @@ -5,11 +5,11 @@ edition = "2021" publish = false [dependencies] -arrow-array = { workspace = true } -arrow-schema = { workspace = true } +arrow = "53" clap = { version = "4.5", features = ["derive"] } delta_kernel = { path = "../../../kernel", features = [ "cloud", + "arrow_53", "default-engine", "developer-visibility", ] } diff --git a/kernel/examples/read-table-changes/Cargo.toml b/kernel/examples/read-table-changes/Cargo.toml index 181da7dc6..35f077bc2 100644 --- a/kernel/examples/read-table-changes/Cargo.toml +++ b/kernel/examples/read-table-changes/Cargo.toml @@ -8,14 +8,12 @@ publish = false release = false [dependencies] -arrow-array = { workspace = true } -arrow-schema = { workspace = true } clap = { version = "4.5", features = ["derive"] } delta_kernel = { path = "../../../kernel", features = [ "cloud", + "arrow", "default-engine", ] } env_logger = "0.11.3" url = "2" itertools = "0.13" -arrow = { workspace = true, features = ["prettyprint"] } diff --git a/kernel/examples/read-table-changes/src/main.rs b/kernel/examples/read-table-changes/src/main.rs index 3360a06cf..ddafc1554 100644 --- a/kernel/examples/read-table-changes/src/main.rs +++ b/kernel/examples/read-table-changes/src/main.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, sync::Arc}; -use arrow::{compute::filter_record_batch, util::pretty::print_batches}; -use arrow_array::RecordBatch; use clap::Parser; +use delta_kernel::arrow::array::RecordBatch; +use delta_kernel::arrow::{compute::filter_record_batch, util::pretty::print_batches}; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; diff --git a/kernel/examples/read-table-multi-threaded/Cargo.toml b/kernel/examples/read-table-multi-threaded/Cargo.toml index 3362e579a..8cb7c9cd3 100644 --- a/kernel/examples/read-table-multi-threaded/Cargo.toml +++ b/kernel/examples/read-table-multi-threaded/Cargo.toml @@ -5,10 +5,11 @@ edition = "2021" publish = false [dependencies] -arrow = { workspace = true, features = ["prettyprint", "chrono-tz"] } +arrow = { version = "53", features = ["prettyprint", "chrono-tz"] } clap = { version = "4.5", features = ["derive"] } delta_kernel = { path = "../../../kernel", features = [ "cloud", + "arrow_53", "default-engine", "sync-engine", "developer-visibility", diff --git a/kernel/examples/read-table-single-threaded/Cargo.toml b/kernel/examples/read-table-single-threaded/Cargo.toml index dc0458139..e71959e7b 100644 --- a/kernel/examples/read-table-single-threaded/Cargo.toml +++ b/kernel/examples/read-table-single-threaded/Cargo.toml @@ -5,9 +5,10 @@ edition = "2021" publish = false [dependencies] -arrow = { workspace = true, features = ["prettyprint", "chrono-tz"] } +arrow = { version = "53", features = ["prettyprint", "chrono-tz"] } clap = { version = "4.5", features = ["derive"] } delta_kernel = { path = "../../../kernel", features = [ + "arrow_53", "cloud", "default-engine", "sync-engine", diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 9f34bd2c5..72747ac6a 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -514,8 +514,8 @@ pub(crate) fn visit_deletion_vector_at<'a>( mod tests { use std::sync::Arc; - use arrow_array::{RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use super::*; use crate::{ diff --git a/kernel/src/arrow.rs b/kernel/src/arrow.rs new file mode 100644 index 000000000..ccae93013 --- /dev/null +++ b/kernel/src/arrow.rs @@ -0,0 +1,11 @@ +//! This module exists to help re-export the version of arrow used by default-engine and other +//! parts of kernel that need arrow + +#[cfg(all(feature = "arrow_53", feature = "arrow_54"))] +compile_error!("Multiple versions of the arrow cannot be used at the same time!"); + +#[cfg(feature = "arrow_53")] +pub use arrow_53::*; + +#[cfg(feature = "arrow_54")] +pub use arrow_54::*; diff --git a/kernel/src/engine/arrow_conversion.rs b/kernel/src/engine/arrow_conversion.rs index 0b905ff3a..a425cd143 100644 --- a/kernel/src/engine/arrow_conversion.rs +++ b/kernel/src/engine/arrow_conversion.rs @@ -2,10 +2,11 @@ use std::sync::Arc; -use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, +use crate::arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, }; +use crate::arrow::error::ArrowError; use itertools::Itertools; use crate::error::Error; diff --git a/kernel/src/engine/arrow_data.rs b/kernel/src/engine/arrow_data.rs index 000d62328..988380901 100644 --- a/kernel/src/engine/arrow_data.rs +++ b/kernel/src/engine/arrow_data.rs @@ -2,12 +2,12 @@ use crate::engine_data::{EngineData, EngineList, EngineMap, GetData, RowVisitor} use crate::schema::{ColumnName, DataType}; use crate::{DeltaResult, Error}; -use arrow_array::cast::AsArray; -use arrow_array::types::{Int32Type, Int64Type}; -use arrow_array::{ +use crate::arrow::array::cast::AsArray; +use crate::arrow::array::types::{Int32Type, Int64Type}; +use crate::arrow::array::{ Array, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait, RecordBatch, StructArray, }; -use arrow_schema::{DataType as ArrowDataType, FieldRef}; +use crate::arrow::datatypes::{DataType as ArrowDataType, FieldRef}; use tracing::debug; use std::collections::{HashMap, HashSet}; @@ -296,8 +296,8 @@ impl ArrowEngineData { mod tests { use std::sync::Arc; - use arrow_array::{RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::{ actions::{get_log_schema, Metadata, Protocol}, diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index 8ee54ebd0..b7a845171 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -3,23 +3,24 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::sync::Arc; -use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene}; -use arrow_arith::numeric::{add, div, mul, sub}; -use arrow_array::cast::AsArray; -use arrow_array::{types::*, MapArray}; -use arrow_array::{ +use crate::arrow::array::AsArray; +use crate::arrow::array::{types::*, MapArray}; +use crate::arrow::array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch, StringArray, StructArray, TimestampMicrosecondArray, }; -use arrow_buffer::OffsetBuffer; -use arrow_ord::cmp::{distinct, eq, gt, gt_eq, lt, lt_eq, neq}; -use arrow_ord::comparison::in_list_utf8; -use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field as ArrowField, Fields, IntervalUnit, - Schema as ArrowSchema, TimeUnit, +use crate::arrow::buffer::OffsetBuffer; +use crate::arrow::compute::concat; +use crate::arrow::compute::kernels::cmp::{distinct, eq, gt, gt_eq, lt, lt_eq, neq}; +use crate::arrow::compute::kernels::comparison::in_list_utf8; +use crate::arrow::compute::kernels::numeric::{add, div, mul, sub}; +use crate::arrow::compute::{and_kleene, is_null, not, or_kleene}; +use crate::arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Fields, IntervalUnit, Schema as ArrowSchema, + TimeUnit, }; -use arrow_select::concat::concat; +use crate::arrow::error::ArrowError; use itertools::Itertools; use super::arrow_conversion::LIST_ARRAY_ROOT; @@ -568,9 +569,9 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { mod tests { use std::ops::{Add, Div, Mul, Sub}; - use arrow_array::{GenericStringArray, Int32Array}; - use arrow_buffer::ScalarBuffer; - use arrow_schema::{DataType, Field, Fields, Schema}; + use crate::arrow::array::{GenericStringArray, Int32Array}; + use crate::arrow::buffer::ScalarBuffer; + use crate::arrow::datatypes::{DataType, Field, Fields, Schema}; use super::*; use crate::expressions::*; diff --git a/kernel/src/engine/arrow_get_data.rs b/kernel/src/engine/arrow_get_data.rs index 145aab66b..fbed64df1 100644 --- a/kernel/src/engine/arrow_get_data.rs +++ b/kernel/src/engine/arrow_get_data.rs @@ -1,4 +1,4 @@ -use arrow_array::{ +use crate::arrow::array::{ types::{GenericStringType, Int32Type, Int64Type}, Array, BooleanArray, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait, PrimitiveArray, diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index e16303cf3..749f1399c 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -12,19 +12,19 @@ use crate::{ DeltaResult, EngineData, Error, }; -use arrow_array::{ +use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait, RecordBatch, StringArray, StructArray, }; -use arrow_buffer::NullBuffer; -use arrow_json::{LineDelimitedWriter, ReaderBuilder}; -use arrow_schema::{ +use crate::arrow::buffer::NullBuffer; +use crate::arrow::compute::concat_batches; +use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, SchemaRef as ArrowSchemaRef, }; -use arrow_select::concat::concat_batches; +use crate::arrow::json::{LineDelimitedWriter, ReaderBuilder}; +use crate::parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; use itertools::Itertools; -use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; use tracing::debug; macro_rules! prim_array_cmp { @@ -41,7 +41,7 @@ macro_rules! prim_array_cmp { .ok_or(Error::invalid_expression( format!("Cannot cast to list array: {}", $right_arr.data_type())) )?; - arrow_ord::comparison::in_list(prim_array, list_array).map(wrap_comparison_result) + crate::arrow::compute::kernels::comparison::in_list(prim_array, list_array).map(wrap_comparison_result) } )+ _ => Err(ArrowError::CastError( @@ -60,7 +60,10 @@ pub(crate) use prim_array_cmp; /// returns a tuples of (mask_indices: Vec, reorder_indices: /// Vec). `mask_indices` is used for generating the mask for reading from the pub(crate) fn make_arrow_error(s: impl Into) -> Error { - Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace() + Error::Arrow(crate::arrow::error::ArrowError::InvalidArgumentError( + s.into(), + )) + .with_backtrace() } /// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to @@ -516,7 +519,7 @@ pub(crate) fn reorder_struct_array( match &reorder_index.transform { ReorderIndexTransform::Cast(target) => { let col = input_cols[parquet_position].as_ref(); - let col = Arc::new(arrow_cast::cast::cast(col, target)?); + let col = Arc::new(crate::arrow::compute::cast(col, target)?); let new_field = Arc::new( input_fields[parquet_position] .as_ref() @@ -742,17 +745,17 @@ pub(crate) fn to_json_bytes( mod tests { use std::sync::Arc; - use arrow::{ - array::AsArray, - buffer::{OffsetBuffer, ScalarBuffer}, - }; - use arrow_array::{ + use crate::arrow::array::{ Array, ArrayRef as ArrowArrayRef, BooleanArray, GenericListArray, Int32Array, StructArray, }; - use arrow_schema::{ + use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; + use crate::arrow::{ + array::AsArray, + buffer::{OffsetBuffer, ScalarBuffer}, + }; use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; @@ -1498,9 +1501,9 @@ mod tests { #[test] fn test_arrow_broken_nested_null_masks() { + use crate::arrow::datatypes::{DataType, Field, Fields, Schema}; use crate::engine::arrow_utils::fix_nested_null_masks; - use arrow::datatypes::{DataType, Field, Fields, Schema}; - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; // Parse some JSON into a nested schema let schema = Arc::new(Schema::new(vec![Field::new( @@ -1532,7 +1535,7 @@ mod tests { { "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } } { "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } } "#; - let batch1 = arrow::json::ReaderBuilder::new(schema.clone()) + let batch1 = crate::arrow::json::ReaderBuilder::new(schema.clone()) .build(json_string.as_bytes()) .unwrap() .next() @@ -1567,7 +1570,7 @@ mod tests { // Write the batch to a parquet file and read it back let mut buffer = vec![]; let mut writer = - parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); + crate::parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); writer.write(&batch1).unwrap(); writer.close().unwrap(); // writer must be closed to write footer let batch2 = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer)) diff --git a/kernel/src/engine/default/file_stream.rs b/kernel/src/engine/default/file_stream.rs index 075716a75..bcdc370a0 100644 --- a/kernel/src/engine/default/file_stream.rs +++ b/kernel/src/engine/default/file_stream.rs @@ -5,8 +5,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use arrow_array::RecordBatch; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use crate::arrow::array::RecordBatch; +use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::future::BoxFuture; use futures::stream::{BoxStream, Stream, StreamExt}; use futures::FutureExt; diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index ab296e12a..1a9bc5f74 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -5,8 +5,8 @@ use std::ops::Range; use std::sync::Arc; use std::task::{ready, Poll}; -use arrow_json::ReaderBuilder; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use crate::arrow::json::ReaderBuilder; use bytes::{Buf, Bytes}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; @@ -201,8 +201,8 @@ impl FileOpener for JsonOpener { mod tests { use std::path::PathBuf; - use arrow::array::{AsArray, RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use crate::arrow::array::{AsArray, RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 50c816d3b..f344ccd86 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -4,16 +4,16 @@ use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; -use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder}; -use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; +use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder}; +use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray}; +use crate::parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use crate::parquet::arrow::arrow_writer::ArrowWriter; +use crate::parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use futures::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; -use parquet::arrow::arrow_reader::{ - ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, -}; -use parquet::arrow::arrow_writer::ArrowWriter; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; @@ -361,8 +361,7 @@ mod tests { use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; - use arrow_array::array::Array; - use arrow_array::RecordBatch; + use crate::arrow::array::{Array, RecordBatch}; use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; use url::Url; diff --git a/kernel/src/engine/ensure_data_types.rs b/kernel/src/engine/ensure_data_types.rs index b6f186671..da699be07 100644 --- a/kernel/src/engine/ensure_data_types.rs +++ b/kernel/src/engine/ensure_data_types.rs @@ -5,7 +5,7 @@ use std::{ ops::Deref, }; -use arrow_schema::{DataType as ArrowDataType, Field as ArrowField}; +use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; use itertools::Itertools; use crate::{ @@ -256,7 +256,7 @@ fn metadata_eq( #[cfg(test)] mod tests { - use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Fields}; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Fields}; use crate::{ engine::ensure_data_types::ensure_data_types, @@ -276,8 +276,8 @@ mod tests { assert!(can_upcast_to_decimal(&Decimal128(5, 1), 6u8, 2i8)); assert!(can_upcast_to_decimal( &Decimal128(10, 5), - arrow_schema::DECIMAL128_MAX_PRECISION, - arrow_schema::DECIMAL128_MAX_SCALE - 5 + crate::arrow::datatypes::DECIMAL128_MAX_PRECISION, + crate::arrow::datatypes::DECIMAL128_MAX_SCALE - 5 )); assert!(can_upcast_to_decimal(&Int8, 3u8, 0i8)); diff --git a/kernel/src/engine/parquet_row_group_skipping.rs b/kernel/src/engine/parquet_row_group_skipping.rs index 79c87d923..fbce2f913 100644 --- a/kernel/src/engine/parquet_row_group_skipping.rs +++ b/kernel/src/engine/parquet_row_group_skipping.rs @@ -2,13 +2,13 @@ use crate::expressions::{ BinaryExpression, ColumnName, Expression, Scalar, UnaryExpression, VariadicExpression, }; +use crate::parquet::arrow::arrow_reader::ArrowReaderBuilder; +use crate::parquet::file::metadata::RowGroupMetaData; +use crate::parquet::file::statistics::Statistics; +use crate::parquet::schema::types::ColumnDescPtr; use crate::predicates::parquet_stats_skipping::ParquetStatsProvider; use crate::schema::{DataType, PrimitiveType}; use chrono::{DateTime, Days}; -use parquet::arrow::arrow_reader::ArrowReaderBuilder; -use parquet::file::metadata::RowGroupMetaData; -use parquet::file::statistics::Statistics; -use parquet::schema::types::ColumnDescPtr; use std::collections::{HashMap, HashSet}; use tracing::debug; diff --git a/kernel/src/engine/parquet_row_group_skipping/tests.rs b/kernel/src/engine/parquet_row_group_skipping/tests.rs index 37a3bb1b0..3f3bb8108 100644 --- a/kernel/src/engine/parquet_row_group_skipping/tests.rs +++ b/kernel/src/engine/parquet_row_group_skipping/tests.rs @@ -1,8 +1,8 @@ use super::*; use crate::expressions::{column_expr, column_name}; +use crate::parquet::arrow::arrow_reader::ArrowReaderMetadata; use crate::predicates::DataSkippingPredicateEvaluator as _; use crate::Expression; -use parquet::arrow::arrow_reader::ArrowReaderMetadata; use std::fs::File; /// Performs an exhaustive set of reads against a specially crafted parquet file. diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 3d33b1025..ddf61bd3c 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -1,6 +1,7 @@ use std::{fs::File, io::BufReader, io::Write}; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use crate::arrow::json::ReaderBuilder; use tempfile::NamedTempFile; use url::Url; @@ -22,7 +23,7 @@ fn try_create_from_json( arrow_schema: ArrowSchemaRef, _predicate: Option, ) -> DeltaResult>> { - let json = arrow_json::ReaderBuilder::new(arrow_schema) + let json = ReaderBuilder::new(arrow_schema) .build(BufReader::new(file))? .map(|data| Ok(ArrowEngineData::new(data?))); Ok(json) @@ -92,10 +93,8 @@ mod tests { use std::sync::Arc; - use arrow_array::{RecordBatch, StringArray}; - use arrow_schema::DataType as ArrowDataType; - use arrow_schema::Field; - use arrow_schema::Schema as ArrowSchema; + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use serde_json::json; use url::Url; diff --git a/kernel/src/engine/sync/mod.rs b/kernel/src/engine/sync/mod.rs index f637ec105..ae80c23bd 100644 --- a/kernel/src/engine/sync/mod.rs +++ b/kernel/src/engine/sync/mod.rs @@ -7,7 +7,7 @@ use crate::{ FileMeta, FileSystemClient, JsonHandler, ParquetHandler, SchemaRef, }; -use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use crate::arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use itertools::Itertools; use std::fs::File; use std::sync::Arc; diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 8714c694f..48010af30 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -1,7 +1,7 @@ use std::fs::File; -use arrow_schema::SchemaRef as ArrowSchemaRef; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; +use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; use super::read_files; use crate::engine::arrow_data::ArrowEngineData; diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 815ef3e51..91e42821d 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -10,6 +10,9 @@ use crate::schema::{DataType, StructType}; use crate::table_properties::ParseIntervalError; use crate::Version; +#[cfg(any(feature = "default-engine-base", feature = "sync-engine"))] +use crate::arrow::error::ArrowError; + /// A [`std::result::Result`] that has the kernel [`Error`] as the error variant pub type DeltaResult = std::result::Result; @@ -29,7 +32,7 @@ pub enum Error { /// An error performing operations on arrow data #[cfg(any(feature = "default-engine-base", feature = "sync-engine"))] #[error(transparent)] - Arrow(arrow_schema::ArrowError), + Arrow(ArrowError), /// User tried to convert engine data to the wrong type #[error("Invalid engine data type. Could not convert to {0}")] @@ -58,10 +61,10 @@ pub enum Error { #[error("Internal error {0}. This is a kernel bug, please report.")] InternalError(String), - /// An error encountered while working with parquet data - #[cfg(feature = "parquet")] + /// An error enountered while working with parquet data + #[cfg(any(feature = "default-engine-base", feature = "sync-engine"))] #[error("Arrow error: {0}")] - Parquet(#[from] parquet::errors::ParquetError), + Parquet(#[from] crate::parquet::errors::ParquetError), /// An error interacting with the object_store crate // We don't use [#from] object_store::Error here as our From impl transforms @@ -304,8 +307,8 @@ from_with_backtrace!( ); #[cfg(any(feature = "default-engine-base", feature = "sync-engine"))] -impl From for Error { - fn from(value: arrow_schema::ArrowError) -> Self { +impl From for Error { + fn from(value: ArrowError) -> Self { Self::Arrow(value).with_backtrace() } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 8dde21afe..65a0a6ab5 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -87,6 +87,8 @@ pub mod table_features; pub mod table_properties; pub mod transaction; +pub mod arrow; +pub mod parquet; pub(crate) mod predicates; pub(crate) mod utils; diff --git a/kernel/src/parquet.rs b/kernel/src/parquet.rs new file mode 100644 index 000000000..bc7eba68f --- /dev/null +++ b/kernel/src/parquet.rs @@ -0,0 +1,11 @@ +//! This module exists to help re-export the version of arrow used by default-engine and other +//! parts of kernel that need arrow + +#[cfg(all(feature = "arrow_53", feature = "arrow_54"))] +compile_error!("Multiple versions of the arrow cannot be used at the same time!"); + +#[cfg(feature = "arrow_53")] +pub use parquet_53::*; + +#[cfg(feature = "arrow_54")] +pub use parquet_54::*; diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 14e2ee50f..0672345eb 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -665,8 +665,8 @@ pub fn selection_vector( pub(crate) mod test_utils { use std::sync::Arc; - use arrow_array::{RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::{ actions::get_log_schema, diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index d74c2456a..4905668a4 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -339,11 +339,11 @@ mod tests { use crate::schema::MapType; use crate::{ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler}; - use arrow::json::writer::LineDelimitedWriter; - use arrow::record_batch::RecordBatch; - use arrow_array::builder::StringBuilder; - use arrow_schema::Schema as ArrowSchema; - use arrow_schema::{DataType as ArrowDataType, Field}; + use crate::arrow::array::{MapArray, MapBuilder, MapFieldNames, StringArray, StringBuilder}; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; + use crate::arrow::error::ArrowError; + use crate::arrow::json::writer::LineDelimitedWriter; + use crate::arrow::record_batch::RecordBatch; struct ExprEngine(Arc); @@ -371,16 +371,15 @@ mod tests { } } - fn build_map(entries: Vec<(&str, &str)>) -> arrow_array::MapArray { + fn build_map(entries: Vec<(&str, &str)>) -> MapArray { let key_builder = StringBuilder::new(); let val_builder = StringBuilder::new(); - let names = arrow_array::builder::MapFieldNames { + let names = MapFieldNames { entry: "entries".to_string(), key: "key".to_string(), value: "value".to_string(), }; - let mut builder = - arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); for (key, val) in entries { builder.keys().append_value(key); builder.values().append_value(val); @@ -494,7 +493,7 @@ mod tests { engine_commit_info_schema, vec![ Arc::new(map_array), - Arc::new(arrow_array::StringArray::from(vec!["some_string"])), + Arc::new(StringArray::from(vec!["some_string"])), ], )?; @@ -533,7 +532,7 @@ mod tests { )])); let commit_info_batch = RecordBatch::try_new( engine_commit_info_schema, - vec![Arc::new(arrow_array::StringArray::new_null(1))], + vec![Arc::new(StringArray::new_null(1))], )?; let _ = generate_commit_info( @@ -542,12 +541,9 @@ mod tests { &ArrowEngineData::new(commit_info_batch), ) .map_err(|e| match e { - Error::Arrow(arrow_schema::ArrowError::SchemaError(_)) => (), + Error::Arrow(ArrowError::SchemaError(_)) => (), Error::Backtraced { source, .. } - if matches!( - &*source, - Error::Arrow(arrow_schema::ArrowError::SchemaError(_)) - ) => {} + if matches!(&*source, Error::Arrow(ArrowError::SchemaError(_))) => {} _ => panic!("expected arrow schema error error, got {:?}", e), }); @@ -564,7 +560,7 @@ mod tests { )])); let commit_info_batch = RecordBatch::try_new( engine_commit_info_schema, - vec![Arc::new(arrow_array::StringArray::new_null(1))], + vec![Arc::new(StringArray::new_null(1))], )?; let _ = generate_commit_info( @@ -573,12 +569,9 @@ mod tests { &ArrowEngineData::new(commit_info_batch), ) .map_err(|e| match e { - Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(_)) => (), + Error::Arrow(ArrowError::InvalidArgumentError(_)) => (), Error::Backtraced { source, .. } - if matches!( - &*source, - Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(_)) - ) => {} + if matches!(&*source, Error::Arrow(ArrowError::InvalidArgumentError(_))) => {} _ => panic!("expected arrow invalid arg error, got {:?}", e), }); @@ -644,16 +637,16 @@ mod tests { ), true, )])); - use arrow_array::builder::StringBuilder; + let key_builder = StringBuilder::new(); let val_builder = StringBuilder::new(); - let names = arrow_array::builder::MapFieldNames { + let names = crate::arrow::array::MapFieldNames { entry: "entries".to_string(), key: "key".to_string(), value: "value".to_string(), }; let mut builder = - arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + crate::arrow::array::MapBuilder::new(Some(names), key_builder, val_builder); builder.append(is_null).unwrap(); let array = builder.finish(); diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 2560dc71d..069018951 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -1,7 +1,7 @@ use std::{error, sync::Arc}; -use arrow::compute::filter_record_batch; -use arrow_array::RecordBatch; +use delta_kernel::arrow::array::RecordBatch; +use delta_kernel::arrow::compute::filter_record_batch; use delta_kernel::engine::sync::SyncEngine; use itertools::Itertools; diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index a918695b7..4268f0626 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,6 +1,6 @@ -use arrow::compute::filter_record_batch; -use arrow::record_batch::RecordBatch; -use arrow::util::pretty::pretty_format_batches; +use delta_kernel::arrow::compute::filter_record_batch; +use delta_kernel::arrow::record_batch::RecordBatch; +use delta_kernel::arrow::util::pretty::pretty_format_batches; use itertools::Itertools; use crate::ArrowEngineData; @@ -24,7 +24,7 @@ macro_rules! sort_lines { #[macro_export] macro_rules! assert_batches_sorted_eq { ($expected_lines_sorted: expr, $CHUNKS: expr) => { - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + let formatted = delta_kernel::arrow::util::pretty::pretty_format_batches($CHUNKS) .unwrap() .to_string(); // fix for windows: \r\n --> diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 120271ef2..2b1bc1a71 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -3,23 +3,23 @@ //! Data (golden tables) are stored in tests/golden_data/.tar.zst //! Each table directory has a table/ and expected/ subdirectory with the input/output respectively -use arrow::array::AsArray; -use arrow::{compute::filter_record_batch, record_batch::RecordBatch}; -use arrow_ord::sort::{lexsort_to_indices, SortColumn}; -use arrow_schema::{FieldRef, Schema}; -use arrow_select::{concat::concat_batches, take::take}; +use delta_kernel::arrow::array::{Array, AsArray, StructArray}; +use delta_kernel::arrow::compute::{concat_batches, take}; +use delta_kernel::arrow::compute::{lexsort_to_indices, SortColumn}; +use delta_kernel::arrow::datatypes::{DataType, FieldRef, Schema}; +use delta_kernel::arrow::{compute::filter_record_batch, record_batch::RecordBatch}; use itertools::Itertools; use paste::paste; use std::path::{Path, PathBuf}; use std::sync::Arc; +use delta_kernel::parquet::arrow::async_reader::{ + ParquetObjectReader, ParquetRecordBatchStreamBuilder, +}; use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Table}; use futures::{stream::TryStreamExt, StreamExt}; use object_store::{local::LocalFileSystem, ObjectStore}; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; -use arrow_array::{Array, StructArray}; -use arrow_schema::DataType; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 1ce9b9017..7c5aade64 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -3,21 +3,20 @@ use std::ops::Not; use std::path::PathBuf; use std::sync::Arc; -use arrow::compute::filter_record_batch; -use arrow_schema::SchemaRef as ArrowSchemaRef; -use arrow_select::concat::concat_batches; use delta_kernel::actions::deletion_vector::split_vector; +use delta_kernel::arrow::compute::{concat_batches, filter_record_batch}; +use delta_kernel::arrow::datatypes::SchemaRef as ArrowSchemaRef; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef}; +use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties}; use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, Stats}; use delta_kernel::scan::Scan; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::{Engine, FileMeta, Table}; use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; -use parquet::file::properties::{EnabledStatistics, WriterProperties}; use test_utils::{ actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch, record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA, diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 2ee6dfdd5..710999f52 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1,10 +1,12 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::array::{Int32Array, StringArray}; -use arrow::record_batch::RecordBatch; -use arrow_schema::Schema as ArrowSchema; -use arrow_schema::{DataType as ArrowDataType, Field}; +use delta_kernel::arrow::array::{ + Int32Array, MapBuilder, MapFieldNames, StringArray, StringBuilder, +}; +use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; +use delta_kernel::arrow::error::ArrowError; +use delta_kernel::arrow::record_batch::RecordBatch; use itertools::Itertools; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; @@ -120,15 +122,14 @@ fn new_commit_info() -> DeltaResult> { false, )])); - use arrow_array::builder::StringBuilder; let key_builder = StringBuilder::new(); let val_builder = StringBuilder::new(); - let names = arrow_array::builder::MapFieldNames { + let names = MapFieldNames { entry: "entries".to_string(), key: "key".to_string(), value: "value".to_string(), }; - let mut builder = arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); builder.keys().append_value("engineInfo"); builder.values().append_value("default engine"); builder.append(true).unwrap(); @@ -349,7 +350,7 @@ async fn test_append() -> Result<(), Box> { let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { let data = RecordBatch::try_new( Arc::new(schema.as_ref().try_into()?), - vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + vec![Arc::new(Int32Array::from(data.to_vec()))], )?; Ok(Box::new(ArrowEngineData::new(data))) }); @@ -441,9 +442,7 @@ async fn test_append() -> Result<(), Box> { test_read( &ArrowEngineData::new(RecordBatch::try_new( Arc::new(schema.as_ref().try_into()?), - vec![Arc::new(arrow::array::Int32Array::from(vec![ - 1, 2, 3, 4, 5, 6, - ]))], + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]))], )?), &table, engine, @@ -487,7 +486,7 @@ async fn test_append_partitioned() -> Result<(), Box> { let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { let data = RecordBatch::try_new( Arc::new(data_schema.as_ref().try_into()?), - vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + vec![Arc::new(Int32Array::from(data.to_vec()))], )?; Ok(Box::new(ArrowEngineData::new(data))) }); @@ -627,7 +626,7 @@ async fn test_append_invalid_schema() -> Result<(), Box> let append_data = [["a", "b"], ["c", "d"]].map(|data| -> DeltaResult<_> { let data = RecordBatch::try_new( Arc::new(data_schema.as_ref().try_into()?), - vec![Arc::new(arrow::array::StringArray::from(data.to_vec()))], + vec![Arc::new(StringArray::from(data.to_vec()))], )?; Ok(Box::new(ArrowEngineData::new(data))) }); @@ -653,12 +652,9 @@ async fn test_append_invalid_schema() -> Result<(), Box> let mut write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); assert!(write_metadata.all(|res| match res { - Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) => true, + Err(KernelError::Arrow(ArrowError::SchemaError(_))) => true, Err(KernelError::Backtraced { source, .. }) - if matches!( - &*source, - KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_)) - ) => + if matches!(&*source, KernelError::Arrow(ArrowError::SchemaError(_))) => true, _ => false, })); diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 0a90e96ed..b602b2e68 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -12,9 +12,6 @@ version.workspace = true release = false [dependencies] -arrow-array = { workspace = true, features = ["chrono-tz"] } -arrow-schema = { workspace = true } -delta_kernel = { path = "../kernel", features = [ "default-engine" ] } +delta_kernel = { path = "../kernel", features = [ "default-engine" ] } itertools = "0.13.0" object_store = { workspace = true } -parquet = { workspace = true } diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 0aeee887d..e8747c539 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -2,14 +2,14 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; -use arrow_schema::ArrowError; +use delta_kernel::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use delta_kernel::arrow::error::ArrowError; use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter; +use delta_kernel::parquet::file::properties::WriterProperties; use delta_kernel::EngineData; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; -use parquet::arrow::arrow_writer::ArrowWriter; -use parquet::file::properties::WriterProperties; /// A common useful initial metadata and protocol. Also includes a single commitInfo pub const METADATA: &str = r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}