Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion TableProvider for memory arrays #384

Merged
merged 24 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
749 changes: 266 additions & 483 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"pyvortex",
"vortex-array",
"vortex-buffer",
"vortex-datafusion",
"vortex-dtype",
"vortex-error",
"vortex-expr",
Expand Down Expand Up @@ -41,15 +42,20 @@ arrow-data = "51.0.0"
arrow-ipc = "51.0.0"
arrow-schema = "51.0.0"
arrow-select = "51.0.0"
async-trait = "0.1"
bindgen = "0.69.4"
bytes = "1.6.0"
bzip2 = "0.4.4"
cargo_metadata = "0.18.1"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
csv = "1.3.0"
datafusion-common = "39.0.0"
datafusion-expr = "39.0.0"
datafusion = "37.1.0"
datafusion-common = "37.1.0"
datafusion-execution = "37.1.0"
datafusion-expr = "37.1.0"
datafusion-physical-expr = "37.1.0"
datafusion-physical-plan = "37.1.0"
derive_builder = "0.20.0"
divan = "0.1.14"
duckdb = { version = "0.10.1", features = ["bundled"] }
Expand Down
11 changes: 4 additions & 7 deletions vortex-array/src/array/primitive/compute/filter_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ fn apply_predicate<T: NativePType, F: Fn(&T, &T) -> bool>(

#[cfg(test)]
mod test {
use itertools::Itertools;
use vortex_dtype::field::FieldPath;
use vortex_expr::{lit, Conjunction, FieldPathOperations};

Expand All @@ -80,13 +79,11 @@ mod test {
}

fn to_int_indices(filtered_primitive: BoolArray) -> Vec<u64> {
let filtered = filtered_primitive
filtered_primitive
.boolean_buffer()
.iter()
.enumerate()
.flat_map(|(idx, v)| if v { Some(idx as u64) } else { None })
.collect_vec();
filtered
.set_indices()
.map(|i| i as u64)
.collect()
}

#[test]
Expand Down
88 changes: 87 additions & 1 deletion vortex-array/src/array/struct/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use vortex_dtype::{FieldNames, Nullability, StructDType};
use vortex_error::vortex_bail;
use vortex_error::{vortex_bail, vortex_err};

use crate::array::primitive::PrimitiveArray;
use crate::stats::ArrayStatisticsCompute;
use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
Expand Down Expand Up @@ -98,6 +99,41 @@ impl StructArray {
}
}

impl StructArray {
/// Return a new StructArray with the given projection applied.
///
/// Projection does not copy data arrays. Projection is defined by an ordinal array slice
/// which specifies the new ordering of columns in the struct. The projection can be used to
/// perform column re-ordering, deletion, or duplication at a logical level, without any data
/// copying.
///
/// This function will return an error if the projection includes invalid column IDs.
pub fn project(self, projection: &[usize]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

let validity = self.validity().take(
&PrimitiveArray::from(projection.iter().map(|idx| *idx as u64).collect::<Vec<_>>())
.into_array(),
)?;

for column_idx in projection {
children.push(
self.field(*column_idx)
.ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?,
);
names.push(self.names()[*column_idx].clone());
}

StructArray::try_new(
FieldNames::from(names.as_slice()),
children,
self.len(),
validity,
)
}
}

impl ArrayFlatten for StructArray {
/// StructEncoding is the canonical form for a [DType::Struct] array, so return self.
fn flatten(self) -> VortexResult<Flattened> {
Expand Down Expand Up @@ -134,3 +170,53 @@ impl AcceptArrayVisitor for StructArray {
impl ArrayStatisticsCompute for StructArray {}

impl EncodingCompression for StructEncoding {}

#[cfg(test)]
mod test {
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};

use crate::array::bool::BoolArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::r#struct::StructArray;
use crate::array::varbin::VarBinArray;
use crate::validity::Validity;
use crate::{ArrayTrait, IntoArray};

#[test]
fn test_project() {
let xs = PrimitiveArray::from_vec(vec![0i64, 1, 2, 3, 4], Validity::NonNullable);
let ys = VarBinArray::from_vec(
vec!["a", "b", "c", "d", "e"],
DType::Utf8(Nullability::NonNullable),
);
let zs = BoolArray::from_vec(vec![true, true, true, false, false], Validity::NonNullable);

let struct_a = StructArray::try_new(
FieldNames::from(["xs".into(), "ys".into(), "zs".into()]),
vec![xs.into_array(), ys.into_array(), zs.into_array()],
5,
Validity::NonNullable,
)
.unwrap();

let struct_b = struct_a.project(&[2usize, 0]).unwrap();
assert_eq!(
struct_b.names().to_vec(),
vec![FieldName::from("zs"), FieldName::from("xs")],
);

assert_eq!(struct_b.len(), 5);

let bools = BoolArray::try_from(struct_b.field(0).unwrap()).unwrap();
assert_eq!(
bools.boolean_buffer().iter().collect::<Vec<_>>(),
vec![true, true, true, false, false]
);

let prims = PrimitiveArray::try_from(struct_b.field(1).unwrap()).unwrap();
assert_eq!(
prims.scalar_buffer::<i64>().to_vec(),
vec![0i64, 1, 2, 3, 4]
);
}
}
22 changes: 22 additions & 0 deletions vortex-array/src/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow_array::ArrayRef;
use vortex_error::VortexResult;

use crate::array::bool::BoolArray;
Expand All @@ -7,6 +8,7 @@ use crate::array::primitive::PrimitiveArray;
use crate::array::r#struct::StructArray;
use crate::array::varbin::VarBinArray;
use crate::array::varbinview::VarBinViewArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::encoding::ArrayEncoding;
use crate::{Array, IntoArray};

Expand All @@ -21,6 +23,26 @@ pub enum Flattened {
Extension(ExtensionArray),
}

impl Flattened {
/// Convert a flat array into its equivalent [ArrayRef](Arrow array).
///
/// Scalar arrays such as Bool and Primitive flattened arrays though should convert with
/// zero copies, while more complex variants such as Struct may require allocations if its child
/// arrays require decompression.
pub fn into_arrow(self) -> ArrayRef {
match self {
Flattened::Null(a) => a.as_arrow(),
Flattened::Bool(a) => a.as_arrow(),
Flattened::Primitive(a) => a.as_arrow(),
Flattened::Struct(a) => a.as_arrow(),
Flattened::VarBin(a) => a.as_arrow(),
Flattened::VarBinView(a) => a.as_arrow(),
Flattened::Extension(a) => a.as_arrow(),
}
.expect("flat array must convert to ArrayRef")
}
}

/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding.
///
/// Flattening an Array ensures that the array's encoding matches one of the builtin canonical
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ impl Debug for ArrayView {
f.debug_struct("ArrayView")
.field("encoding", &self.encoding)
.field("dtype", &self.dtype)
// .field("array", &self.array)
.field("buffers", &self.buffers)
.field("ctx", &self.ctx)
.finish()
Expand Down
4 changes: 2 additions & 2 deletions vortex-buffer/src/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::str::Utf8Error;

use crate::Buffer;

/// A wrapper around a `Buffer` that guarantees that the buffer contains valid UTF-8.
/// A wrapper around a [`Buffer`] that guarantees that the buffer contains valid UTF-8.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)]
pub struct BufferString(Buffer);

impl BufferString {
/// Creates a new `BufferString` from a `Buffer`.
/// Creates a new `BufferString` from a [`Buffer`].
///
/// # Safety
/// Assumes that the buffer contains valid UTF-8.
Expand Down
35 changes: 35 additions & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "vortex-datafusion"
version.workspace = true
homepage.workspace = true
repository.workspace = true
authors.workspace = true
license.workspace = true
keywords.workspace = true
include.workspace = true
edition.workspace = true
rust-version.workspace = true

[dependencies]
vortex-array = { path = "../vortex-array" }
vortex-dtype = { path = "../vortex-dtype", features = ["arrow"] }
vortex-error = { path = "../vortex-error" }

arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
pin-project = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }

[lints]
workspace = true
Loading