Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into remove-visit-snapsh…
Browse files Browse the repository at this point in the history
…ot-schema
  • Loading branch information
zachschuermann committed Feb 21, 2025
2 parents 94aab62 + baa3fc3 commit 1e52a9a
Show file tree
Hide file tree
Showing 56 changed files with 932 additions and 769 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
cargo install cargo-msrv --locked
- name: verify-msrv
run: |
cargo msrv --path kernel/ verify --all-features
cargo msrv --path kernel/ verify --features $(cat .github/workflows/default-kernel-features)
cargo msrv --path derive-macros/ verify --all-features
cargo msrv --path ffi/ verify --all-features
cargo msrv --path ffi-proc-macros/ verify --all-features
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
- name: check kernel builds with no-default-features
run: cargo build -p delta_kernel --no-default-features
- name: build and lint with clippy
run: cargo clippy --benches --tests --all-features -- -D warnings
run: cargo clippy --benches --tests --features $(cat .github/workflows/default-kernel-features) -- -D warnings
- name: lint without default features
run: cargo clippy --no-default-features -- -D warnings
- name: check kernel builds with default-engine
Expand All @@ -129,7 +129,7 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v2
- name: test
run: cargo test --workspace --verbose --all-features -- --skip read_table_version_hdfs
run: cargo test --workspace --verbose --features $(cat .github/workflows/default-kernel-features) -- --skip read_table_version_hdfs

ffi_test:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -229,7 +229,7 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v2
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
run: cargo llvm-cov --features $(cat .github/workflows/default-kernel-features) --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/default-kernel-features
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
integration-test,default-engine,default-engine-rustls,cloud,arrow,sync-engine
15 changes: 0 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@ rust-version = "1.80"
version = "0.6.1"

[workspace.dependencies]
# When changing the arrow version range, also modify ffi/Cargo.toml which has
# its own arrow version ranges witeh modified features. Failure to do so will
# result in compilation errors as two different sets of arrow dependencies may
# be sourced
arrow = { version = ">=53, <55" }
arrow-arith = { version = ">=53, <55" }
arrow-array = { version = ">=53, <55" }
arrow-buffer = { version = ">=53, <55" }
arrow-cast = { version = ">=53, <55" }
arrow-data = { version = ">=53, <55" }
arrow-ord = { version = ">=53, <55" }
arrow-json = { version = ">=53, <55" }
arrow-select = { version = ">=53, <55" }
arrow-schema = { version = ">=53, <55" }
parquet = { version = ">=53, <55", features = ["object_store"] }
object_store = { version = ">=0.11, <0.12" }
hdfs-native-object-store = "0.12.0"
hdfs-native = "0.10.0"
Expand Down
7 changes: 1 addition & 6 deletions acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ rust-version.workspace = true
release = false

[dependencies]
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-select = { workspace = true }
arrow-schema = { workspace = true }
delta_kernel = { path = "../kernel", features = [
"default-engine",
"arrow_53",
"developer-visibility",
] }
futures = "0.3"
itertools = "0.13"
object_store = { workspace = true }
parquet = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
Expand Down
17 changes: 10 additions & 7 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{path::Path, sync::Arc};

use arrow_array::{Array, RecordBatch};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::{DataType, Schema};
use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::take};
use delta_kernel::arrow::array::{Array, RecordBatch};
use delta_kernel::arrow::compute::{
concat_batches, filter_record_batch, lexsort_to_indices, take, SortColumn,
};
use delta_kernel::arrow::datatypes::{DataType, Schema};

use delta_kernel::parquet::arrow::async_reader::{
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};
use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Engine, Error, Table};
use futures::{stream::TryStreamExt, StreamExt};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use crate::{TestCaseInfo, TestResult};

Expand Down Expand Up @@ -83,8 +86,8 @@ fn assert_schema_fields_match(schema: &Schema, golden: &Schema) {
fn normalize_col(col: Arc<dyn Array>) -> Arc<dyn Array> {
if let DataType::Timestamp(unit, Some(zone)) = col.data_type() {
if **zone == *"+00:00" {
arrow_cast::cast::cast(&col, &DataType::Timestamp(*unit, Some("UTC".into())))
.expect("Could not cast to UTC")
let data_type = DataType::Timestamp(*unit, Some("UTC".into()));
delta_kernel::arrow::compute::cast(&col, &data_type).expect("Could not cast to UTC")
} else {
col
}
Expand Down
2 changes: 1 addition & 1 deletion feature-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version.workspace = true
release = false

[dependencies]
delta_kernel = { path = "../kernel" }
delta_kernel = { path = "../kernel", features = ["arrow_53"] }

[features]
default-engine = [ "delta_kernel/default-engine" ]
Expand Down
15 changes: 2 additions & 13 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,13 @@ tracing-core = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] }
url = "2"
delta_kernel = { path = "../kernel", default-features = false, features = [
"arrow",
"developer-visibility",
] }
delta_kernel_ffi_macros = { path = "../ffi-proc-macros", version = "0.6.1" }

# used if we use the default engine to be able to move arrow data into the c-ffi format
arrow-schema = { version = ">=53, <55", default-features = false, features = [
"ffi",
], optional = true }
arrow-data = { version = ">=53, <55", default-features = false, features = [
"ffi",
], optional = true }
arrow-array = { version = ">=53, <55", default-features = false, optional = true }

[build-dependencies]
cbindgen = "0.27.0"
cbindgen = "0.28"
libc = "0.2.158"

[dev-dependencies]
Expand All @@ -52,9 +44,6 @@ default = ["default-engine"]
cloud = ["delta_kernel/cloud"]
default-engine = [
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
]
tracing = [ "tracing-core", "tracing-subscriber" ]
sync-engine = ["delta_kernel/sync-engine"]
Expand Down
2 changes: 1 addition & 1 deletion ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ parse_deps = true
# only crates found in this list will ever be parsed.
#
# default: there is no allow-list (NOTE: this is the opposite of [])
include = ["delta_kernel", "arrow-data", "arrow-schema"]
include = ["arrow", "arrow-data", "arrow-schema", "delta_kernel"]
126 changes: 40 additions & 86 deletions ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ArrowContext* init_arrow_context()
context->num_batches = 0;
context->batches = NULL;
context->cur_filter = NULL;
context->cur_transform = NULL;
return context;
}

Expand Down Expand Up @@ -50,86 +51,10 @@ static GArrowRecordBatch* get_record_batch(FFI_ArrowArray* array, GArrowSchema*
return record_batch;
}

// Add columns to a record batch for each partition. In a "real" engine we would want to parse the
// string values into the correct data type. This program just adds all partition columns as strings
// for simplicity
static GArrowRecordBatch* add_partition_columns(
GArrowRecordBatch* record_batch,
PartitionList* partition_cols,
const CStringMap* partition_values)
{
gint64 rows = garrow_record_batch_get_n_rows(record_batch);
gint64 cols = garrow_record_batch_get_n_columns(record_batch);
GArrowRecordBatch* cur_record_batch = record_batch;
GError* error = NULL;
for (uintptr_t i = 0; i < partition_cols->len; i++) {
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
partition_val ? partition_val : "NULL",
pos);
GArrowStringArrayBuilder* builder = garrow_string_array_builder_new();
for (gint64 i = 0; i < rows; i++) {
if (partition_val) {
garrow_string_array_builder_append_string(builder, partition_val, &error);
} else {
garrow_array_builder_append_null((GArrowArrayBuilder*)builder, &error);
}
if (report_g_error("Can't append to partition column builder", error)) {
break;
}
}

if (partition_val) {
free(partition_val);
}

if (error != NULL) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}

GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
if (report_g_error("Can't build string array for partition column", error)) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}
g_object_unref(builder);

GArrowDataType* string_data_type = (GArrowDataType*)garrow_string_data_type_new();
GArrowField* field = garrow_field_new(col, string_data_type);
GArrowRecordBatch* old_batch = cur_record_batch;
cur_record_batch = garrow_record_batch_add_column(old_batch, pos, field, partition_col, &error);
g_object_unref(old_batch);
g_object_unref(partition_col);
g_object_unref(string_data_type);
g_object_unref(field);
if (cur_record_batch == NULL) {
if (error != NULL) {
printf("Could not add column at %u: %s\n", pos, error->message);
g_error_free(error);
}
}
}
return cur_record_batch;
}

// append a batch to our context
static void add_batch_to_context(
ArrowContext* context,
ArrowFFIData* arrow_data,
PartitionList* partition_cols,
const CStringMap* partition_values)
ArrowFFIData* arrow_data)
{
GArrowSchema* schema = get_schema(&arrow_data->schema);
GArrowRecordBatch* record_batch = get_record_batch(&arrow_data->array, schema);
Expand All @@ -142,11 +67,6 @@ static void add_batch_to_context(
g_object_unref(context->cur_filter);
context->cur_filter = NULL;
}
record_batch = add_partition_columns(record_batch, partition_cols, partition_values);
if (record_batch == NULL) {
printf("Failed to add partition columns, not adding batch\n");
return;
}
context->batches = g_list_append(context->batches, record_batch);
context->num_batches++;
print_diag(
Expand Down Expand Up @@ -187,28 +107,61 @@ static GArrowBooleanArray* slice_to_arrow_bool_array(const KernelBoolSlice slice
return (GArrowBooleanArray*)ret;
}

// This will apply the transform in the context to the specified data. This consumes the passed
// ExclusiveEngineData and return a new transformed one
static ExclusiveEngineData* apply_transform(
struct EngineContext* context,
ExclusiveEngineData* data) {
if (!context->arrow_context->cur_transform) {
print_diag(" No transform needed");
return data;
}
print_diag(" Applying transform\n");
SharedExpressionEvaluator* evaluator = get_evaluator(
context->engine,
context->read_schema, // input schema
context->arrow_context->cur_transform,
context->logical_schema); // output schema
ExternResultHandleExclusiveEngineData transformed_res = evaluate(
context->engine,
&data,
evaluator);
free_engine_data(data);
free_evaluator(evaluator);
if (transformed_res.tag != OkHandleExclusiveEngineData) {
print_error("Failed to transform read data.", (Error*)transformed_res.err);
free_error((Error*)transformed_res.err);
return NULL;
}
return transformed_res.ok;
}

// This is the callback that will be called for each chunk of data read from the parquet file
static void visit_read_data(void* vcontext, ExclusiveEngineData* data)
{
print_diag(" Converting read data to arrow\n");
struct EngineContext* context = vcontext;
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(data, context->engine);
ExclusiveEngineData* transformed = apply_transform(context, data);
if (!transformed) {
exit(-1);
}
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(transformed, context->engine);
if (arrow_res.tag != OkArrowFFIData) {
print_error("Failed to get arrow data.", (Error*)arrow_res.err);
free_error((Error*)arrow_res.err);
exit(-1);
}
ArrowFFIData* arrow_data = arrow_res.ok;
add_batch_to_context(
context->arrow_context, arrow_data, context->partition_cols, context->partition_values);
add_batch_to_context(context->arrow_context, arrow_data);
free(arrow_data); // just frees the struct, the data and schema are freed/owned by add_batch_to_context
}

// We call this for each file we get called back to read in read_table.c::visit_callback
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector)
const KernelBoolSlice selection_vector,
const Expression* transform)
{
int full_len = strlen(context->table_root) + path.len + 1;
char* full_path = malloc(sizeof(char) * full_len);
Expand All @@ -233,6 +186,7 @@ void c_read_parquet_file(
}
context->arrow_context->cur_filter = sel_array;
}
context->arrow_context->cur_transform = transform;
ExclusiveFileReadResultIterator* read_iter = read_res.ok;
for (;;) {
ExternResultbool ok_res = read_result_next(read_iter, context, visit_read_data);
Expand Down
4 changes: 3 additions & 1 deletion ffi/examples/read-table/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ typedef struct ArrowContext
gsize num_batches;
GList* batches;
GArrowBooleanArray* cur_filter;
const Expression* cur_transform;
} ArrowContext;

ArrowContext* init_arrow_context(void);
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector);
const KernelBoolSlice selection_vector,
const Expression* transform);
void print_arrow_context(ArrowContext* context);
void free_arrow_context(ArrowContext* context);

Expand Down
Loading

0 comments on commit 1e52a9a

Please sign in to comment.