From baa3fc3b0c1e14a87961f8c399c39b0737a34ee6 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 20 Feb 2025 16:51:01 -0800 Subject: [PATCH] Part 4: read_table.c uses transform in ffi (#614) Use new transform functionality to transform data over FFI. This lets us get rid of all the gross partition adding code in c :) In particular: - remove `add_partition_columns` in `arrow.c`, we don't need it anymore - expose ffi methods to get an expression evaluator and evaluate an expression from `c` - use the above to add an `apply_transform` function in `arrow.c` ## How was this change tested? - existing tests --- ffi/examples/read-table/arrow.c | 126 ++++++------------ ffi/examples/read-table/arrow.h | 4 +- ffi/examples/read-table/read_table.c | 8 +- ffi/examples/read-table/read_table.h | 1 + ffi/src/engine_funcs.rs | 115 +++++++++++++++- ffi/src/expressions/kernel.rs | 25 +++- ffi/src/lib.rs | 4 +- ffi/src/scan.rs | 65 ++++++++- .../expected-data/basic-partitioned.expected | 16 +-- kernel/src/engine/arrow_expression.rs | 5 + 10 files changed, 263 insertions(+), 106 deletions(-) diff --git a/ffi/examples/read-table/arrow.c b/ffi/examples/read-table/arrow.c index c6214df6b..106836997 100644 --- a/ffi/examples/read-table/arrow.c +++ b/ffi/examples/read-table/arrow.c @@ -11,6 +11,7 @@ ArrowContext* init_arrow_context() context->num_batches = 0; context->batches = NULL; context->cur_filter = NULL; + context->cur_transform = NULL; return context; } @@ -50,86 +51,10 @@ static GArrowRecordBatch* get_record_batch(FFI_ArrowArray* array, GArrowSchema* return record_batch; } -// Add columns to a record batch for each partition. In a "real" engine we would want to parse the -// string values into the correct data type. This program just adds all partition columns as strings -// for simplicity -static GArrowRecordBatch* add_partition_columns( - GArrowRecordBatch* record_batch, - PartitionList* partition_cols, - const CStringMap* partition_values) -{ - gint64 rows = garrow_record_batch_get_n_rows(record_batch); - gint64 cols = garrow_record_batch_get_n_columns(record_batch); - GArrowRecordBatch* cur_record_batch = record_batch; - GError* error = NULL; - for (uintptr_t i = 0; i < partition_cols->len; i++) { - char* col = partition_cols->cols[i]; - guint pos = cols + i; - KernelStringSlice key = { col, strlen(col) }; - char* partition_val = get_from_string_map(partition_values, key, allocate_string); - print_diag( - " Adding partition column '%s' with value '%s' at column %u\n", - col, - partition_val ? partition_val : "NULL", - pos); - GArrowStringArrayBuilder* builder = garrow_string_array_builder_new(); - for (gint64 i = 0; i < rows; i++) { - if (partition_val) { - garrow_string_array_builder_append_string(builder, partition_val, &error); - } else { - garrow_array_builder_append_null((GArrowArrayBuilder*)builder, &error); - } - if (report_g_error("Can't append to partition column builder", error)) { - break; - } - } - - if (partition_val) { - free(partition_val); - } - - if (error != NULL) { - printf("Giving up on column %s\n", col); - g_error_free(error); - g_object_unref(builder); - error = NULL; - continue; - } - - GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error); - if (report_g_error("Can't build string array for partition column", error)) { - printf("Giving up on column %s\n", col); - g_error_free(error); - g_object_unref(builder); - error = NULL; - continue; - } - g_object_unref(builder); - - GArrowDataType* string_data_type = (GArrowDataType*)garrow_string_data_type_new(); - GArrowField* field = garrow_field_new(col, string_data_type); - GArrowRecordBatch* old_batch = cur_record_batch; - cur_record_batch = garrow_record_batch_add_column(old_batch, pos, field, partition_col, &error); - g_object_unref(old_batch); - g_object_unref(partition_col); - g_object_unref(string_data_type); - g_object_unref(field); - if (cur_record_batch == NULL) { - if (error != NULL) { - printf("Could not add column at %u: %s\n", pos, error->message); - g_error_free(error); - } - } - } - return cur_record_batch; -} - // append a batch to our context static void add_batch_to_context( ArrowContext* context, - ArrowFFIData* arrow_data, - PartitionList* partition_cols, - const CStringMap* partition_values) + ArrowFFIData* arrow_data) { GArrowSchema* schema = get_schema(&arrow_data->schema); GArrowRecordBatch* record_batch = get_record_batch(&arrow_data->array, schema); @@ -142,11 +67,6 @@ static void add_batch_to_context( g_object_unref(context->cur_filter); context->cur_filter = NULL; } - record_batch = add_partition_columns(record_batch, partition_cols, partition_values); - if (record_batch == NULL) { - printf("Failed to add partition columns, not adding batch\n"); - return; - } context->batches = g_list_append(context->batches, record_batch); context->num_batches++; print_diag( @@ -187,20 +107,52 @@ static GArrowBooleanArray* slice_to_arrow_bool_array(const KernelBoolSlice slice return (GArrowBooleanArray*)ret; } +// This will apply the transform in the context to the specified data. This consumes the passed +// ExclusiveEngineData and return a new transformed one +static ExclusiveEngineData* apply_transform( + struct EngineContext* context, + ExclusiveEngineData* data) { + if (!context->arrow_context->cur_transform) { + print_diag(" No transform needed"); + return data; + } + print_diag(" Applying transform\n"); + SharedExpressionEvaluator* evaluator = get_evaluator( + context->engine, + context->read_schema, // input schema + context->arrow_context->cur_transform, + context->logical_schema); // output schema + ExternResultHandleExclusiveEngineData transformed_res = evaluate( + context->engine, + &data, + evaluator); + free_engine_data(data); + free_evaluator(evaluator); + if (transformed_res.tag != OkHandleExclusiveEngineData) { + print_error("Failed to transform read data.", (Error*)transformed_res.err); + free_error((Error*)transformed_res.err); + return NULL; + } + return transformed_res.ok; +} + // This is the callback that will be called for each chunk of data read from the parquet file static void visit_read_data(void* vcontext, ExclusiveEngineData* data) { print_diag(" Converting read data to arrow\n"); struct EngineContext* context = vcontext; - ExternResultArrowFFIData arrow_res = get_raw_arrow_data(data, context->engine); + ExclusiveEngineData* transformed = apply_transform(context, data); + if (!transformed) { + exit(-1); + } + ExternResultArrowFFIData arrow_res = get_raw_arrow_data(transformed, context->engine); if (arrow_res.tag != OkArrowFFIData) { print_error("Failed to get arrow data.", (Error*)arrow_res.err); free_error((Error*)arrow_res.err); exit(-1); } ArrowFFIData* arrow_data = arrow_res.ok; - add_batch_to_context( - context->arrow_context, arrow_data, context->partition_cols, context->partition_values); + add_batch_to_context(context->arrow_context, arrow_data); free(arrow_data); // just frees the struct, the data and schema are freed/owned by add_batch_to_context } @@ -208,7 +160,8 @@ static void visit_read_data(void* vcontext, ExclusiveEngineData* data) void c_read_parquet_file( struct EngineContext* context, const KernelStringSlice path, - const KernelBoolSlice selection_vector) + const KernelBoolSlice selection_vector, + const Expression* transform) { int full_len = strlen(context->table_root) + path.len + 1; char* full_path = malloc(sizeof(char) * full_len); @@ -233,6 +186,7 @@ void c_read_parquet_file( } context->arrow_context->cur_filter = sel_array; } + context->arrow_context->cur_transform = transform; ExclusiveFileReadResultIterator* read_iter = read_res.ok; for (;;) { ExternResultbool ok_res = read_result_next(read_iter, context, visit_read_data); diff --git a/ffi/examples/read-table/arrow.h b/ffi/examples/read-table/arrow.h index 0236b238b..8f34cdd4f 100644 --- a/ffi/examples/read-table/arrow.h +++ b/ffi/examples/read-table/arrow.h @@ -15,13 +15,15 @@ typedef struct ArrowContext gsize num_batches; GList* batches; GArrowBooleanArray* cur_filter; + const Expression* cur_transform; } ArrowContext; ArrowContext* init_arrow_context(void); void c_read_parquet_file( struct EngineContext* context, const KernelStringSlice path, - const KernelBoolSlice selection_vector); + const KernelBoolSlice selection_vector, + const Expression* transform); void print_arrow_context(ArrowContext* context); void free_arrow_context(ArrowContext* context); diff --git a/ffi/examples/read-table/read_table.c b/ffi/examples/read-table/read_table.c index 704559a59..0ddc20ded 100644 --- a/ffi/examples/read-table/read_table.c +++ b/ffi/examples/read-table/read_table.c @@ -50,6 +50,7 @@ void scan_row_callback( int64_t size, const Stats* stats, const DvInfo* dv_info, + const Expression* transform, const CStringMap* partition_values) { (void)size; // not using this at the moment @@ -76,7 +77,7 @@ void scan_row_callback( context->partition_values = partition_values; print_partition_info(context, partition_values); #ifdef PRINT_ARROW_DATA - c_read_parquet_file(context, path, selection_vector); + c_read_parquet_file(context, path, selection_vector, transform); #endif free_bool_slice(selection_vector); context->partition_values = NULL; @@ -273,10 +274,12 @@ int main(int argc, char* argv[]) SharedScan* scan = scan_res.ok; SharedGlobalScanState* global_state = get_global_scan_state(scan); + SharedSchema* logical_schema = get_global_logical_schema(global_state); SharedSchema* read_schema = get_global_read_schema(global_state); PartitionList* partition_cols = get_partition_list(global_state); struct EngineContext context = { global_state, + logical_schema, read_schema, table_root, engine, @@ -321,7 +324,8 @@ int main(int argc, char* argv[]) free_kernel_scan_data(data_iter); free_scan(scan); - free_global_read_schema(read_schema); + free_schema(logical_schema); + free_schema(read_schema); free_global_scan_state(global_state); free_snapshot(snapshot); free_engine(engine); diff --git a/ffi/examples/read-table/read_table.h b/ffi/examples/read-table/read_table.h index 28d9c72dc..cf55863d9 100644 --- a/ffi/examples/read-table/read_table.h +++ b/ffi/examples/read-table/read_table.h @@ -14,6 +14,7 @@ typedef struct PartitionList struct EngineContext { SharedGlobalScanState* global_state; + SharedSchema* logical_schema; SharedSchema* read_schema; char* table_root; SharedExternEngine* engine; diff --git a/ffi/src/engine_funcs.rs b/ffi/src/engine_funcs.rs index 1afb60510..7c12bcf51 100644 --- a/ffi/src/engine_funcs.rs +++ b/ffi/src/engine_funcs.rs @@ -2,7 +2,10 @@ use std::sync::Arc; -use delta_kernel::{schema::Schema, DeltaResult, FileDataReadResultIterator}; +use delta_kernel::{ + schema::{DataType, Schema, SchemaRef}, + DeltaResult, EngineData, Expression, ExpressionEvaluator, FileDataReadResultIterator, +}; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; use url::Url; @@ -97,7 +100,7 @@ pub unsafe extern "C" fn free_read_result_iter(data: Handle, + engine: Handle, // TODO Does this cause a free? file: &FileMeta, physical_schema: Handle, ) -> ExternResult> { @@ -130,3 +133,111 @@ fn read_parquet_file_impl( }); Ok(res.into()) } + +// Expression Eval + +#[handle_descriptor(target=dyn ExpressionEvaluator, mutable=false)] +pub struct SharedExpressionEvaluator; + +/// Get the evaluator as provided by the passed engines `ExpressionHandler`. +/// +/// # Safety +/// Caller is responsible for calling with a valid `Engine`, `Expression`, and `SharedSchema`s +#[no_mangle] +pub unsafe extern "C" fn get_evaluator( + engine: Handle, + input_schema: Handle, + expression: &Expression, + // TODO: Make this a data_type, and give a way for c code to go between schema <-> datatype + output_type: Handle, +) -> Handle { + let engine = unsafe { engine.clone_as_arc() }; + let input_schema = unsafe { input_schema.clone_as_arc() }; + let output_type: DataType = output_type.as_ref().clone().into(); + get_evaluator_impl(engine, input_schema, expression, output_type) +} + +fn get_evaluator_impl( + extern_engine: Arc, + input_schema: SchemaRef, + expression: &Expression, + output_type: DataType, +) -> Handle { + let engine = extern_engine.engine(); + let evaluator = engine.get_expression_handler().get_evaluator( + input_schema, + expression.clone(), + output_type, + ); + evaluator.into() +} + +/// Free an evaluator +/// # Safety +/// +/// Caller is responsible for passing a valid handle. +#[no_mangle] +pub unsafe extern "C" fn free_evaluator(evaluator: Handle) { + debug!("engine released evaluator"); + evaluator.drop_handle(); +} + +/// Use the passed `evaluator` to evaluate its expression against the passed `batch` data. +/// +/// # Safety +/// Caller is responsible for calling with a valid `Engine`, `ExclusiveEngineData`, and `Evaluator` +#[no_mangle] +pub unsafe extern "C" fn evaluate( + engine: Handle, + batch: &mut Handle, + evaluator: Handle, +) -> ExternResult> { + let engine = unsafe { engine.clone_as_arc() }; + let batch = unsafe { batch.as_mut() }; + let evaluator = unsafe { evaluator.clone_as_arc() }; + let res = evaluate_impl(batch, evaluator.as_ref()); + res.into_extern_result(&engine.as_ref()) +} + +fn evaluate_impl( + batch: &dyn EngineData, + evaluator: &dyn ExpressionEvaluator, +) -> DeltaResult> { + evaluator.evaluate(batch).map(Into::into) +} + +#[cfg(test)] +mod tests { + use super::{free_evaluator, get_evaluator}; + use crate::{free_engine, handle::Handle, scan::SharedSchema, tests::get_default_engine}; + use delta_kernel::{ + schema::{DataType, StructField, StructType}, + Expression, + }; + use std::sync::Arc; + + #[test] + fn test_get_evaluator() { + let engine = get_default_engine(); + let in_schema = Arc::new(StructType::new(vec![StructField::new( + "a", + DataType::LONG, + true, + )])); + let expr = Expression::literal(1); + let output_type: Handle = in_schema.clone().into(); + let in_schema_handle: Handle = in_schema.into(); + unsafe { + let evaluator = get_evaluator( + engine.shallow_copy(), + in_schema_handle.shallow_copy(), + &expr, + output_type.shallow_copy(), + ); + in_schema_handle.drop_handle(); + output_type.drop_handle(); + free_engine(engine); + free_evaluator(evaluator); + } + } +} diff --git a/ffi/src/expressions/kernel.rs b/ffi/src/expressions/kernel.rs index a5116db47..a2a1dcd1f 100644 --- a/ffi/src/expressions/kernel.rs +++ b/ffi/src/expressions/kernel.rs @@ -189,6 +189,29 @@ pub struct EngineExpressionVisitor { pub unsafe extern "C" fn visit_expression( expression: &Handle, visitor: &mut EngineExpressionVisitor, +) -> usize { + visit_expression_internal(expression.as_ref(), visitor) +} + +/// Visit the expression of the passed [`Expression`] pointer using the provided `visitor`. See the +/// documentation of [`EngineExpressionVisitor`] for a description of how this visitor works. +/// +/// This method returns the id that the engine generated for the top level expression +/// +/// # Safety +/// +/// The caller must pass a valid Expression pointer and expression visitor +#[no_mangle] +pub unsafe extern "C" fn visit_expression_ref( + expression: &Expression, + visitor: &mut EngineExpressionVisitor, +) -> usize { + visit_expression_internal(expression, visitor) +} + +pub fn visit_expression_internal( + expression: &Expression, + visitor: &mut EngineExpressionVisitor, ) -> usize { macro_rules! call { ( $visitor:ident, $visitor_fn:ident $(, $extra_args:expr) *) => { @@ -367,6 +390,6 @@ pub unsafe extern "C" fn visit_expression( } } let top_level = call!(visitor, make_field_list, 1); - visit_expression_impl(visitor, expression.as_ref(), top_level); + visit_expression_impl(visitor, expression, top_level); top_level } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index caf04ef2c..eb9644963 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -330,7 +330,7 @@ pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) { /// an opaque struct that encapsulates data read by an engine. this handle can be passed back into /// some kernel calls to operate on the data, or can be converted into the raw data as read by the /// [`delta_kernel::Engine`] by calling [`get_raw_engine_data`] -#[handle_descriptor(target=dyn EngineData, mutable=true, sized=false)] +#[handle_descriptor(target=dyn EngineData, mutable=true)] pub struct ExclusiveEngineData; /// Drop an `ExclusiveEngineData`. @@ -768,7 +768,7 @@ mod tests { } } - fn get_default_engine() -> Handle { + pub(crate) fn get_default_engine() -> Handle { let path = "memory:///doesntmatter/foo"; let path = kernel_string_slice!(path); let builder = unsafe { ok_or_panic(get_engine_builder(path, allocate_err)) }; diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 73f691010..a457b7b9d 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; use delta_kernel::scan::{Scan, ScanData}; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; -use delta_kernel::{DeltaResult, Error, ExpressionRef}; +use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef}; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; use url::Url; @@ -15,6 +15,7 @@ use url::Url; use crate::expressions::engine::{ unwrap_kernel_expression, EnginePredicate, KernelExpressionVisitorState, }; +use crate::expressions::SharedExpression; use crate::{ kernel_string_slice, AllocateStringFn, ExclusiveEngineData, ExternEngine, ExternResult, IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid, @@ -99,12 +100,25 @@ pub unsafe extern "C" fn get_global_read_schema( state.physical_schema.clone().into() } -/// Free a global read schema +/// Get the kernel view of the physical read schema that an engine should read from parquet file in +/// a scan +/// +/// # Safety +/// Engine is responsible for providing a valid GlobalScanState pointer +#[no_mangle] +pub unsafe extern "C" fn get_global_logical_schema( + state: Handle, +) -> Handle { + let state = unsafe { state.as_ref() }; + state.logical_schema.clone().into() +} + +/// Free a schema /// /// # Safety /// Engine is responsible for providing a valid schema obtained via [`get_global_read_schema`] #[no_mangle] -pub unsafe extern "C" fn free_global_read_schema(schema: Handle) { +pub unsafe extern "C" fn free_schema(schema: Handle) { schema.drop_handle(); } @@ -263,12 +277,23 @@ pub struct Stats { pub num_records: u64, } +/// This callback will be invoked for each valid file that needs to be read for a scan. +/// +/// The arguments to the callback are: +/// * `context`: a `void*` context this can be anything that engine needs to pass through to each call +/// * `path`: a `KernelStringSlice` which is the path to the file +/// * `size`: an `i64` which is the size of the file +/// * `dv_info`: a [`DvInfo`] struct, which allows getting the selection vector for this file +/// * `transform`: An optional expression that, if not `NULL`, _must_ be applied to physical data to +/// convert it to the correct logical format. If this is `NULL`, no transform is needed. +/// * `partition_values`: [DEPRECATED] a `HashMap` which are partition values type CScanCallback = extern "C" fn( engine_context: NullableCvoid, path: KernelStringSlice, size: i64, stats: Option<&Stats>, dv_info: &DvInfo, + transform: Option<&Expression>, partition_map: &CStringMap, ); @@ -303,10 +328,40 @@ pub unsafe extern "C" fn get_from_string_map( .and_then(|v| allocate_fn(kernel_string_slice!(v))) } +/// Transformation expressions that need to be applied to each row `i` in ScanData. You can use +/// [`get_transform_for_row`] to get the transform for a particular row. If that returns an +/// associated expression, it _must_ be applied to the data read from the file specified by the +/// row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If +/// `get_transform_for_row` returns `NULL` no expression need be applied and the data read from disk +/// is already in the correct logical state. +/// +/// NB: If you are using `visit_scan_data` you don't need to worry about dealing with probing +/// `CTransforms`. The callback will be invoked with the correct transform for you. pub struct CTransforms { transforms: Vec>, } +#[no_mangle] +/// Allow getting the transform for a particular row. If the requested row is outside the range of +/// the passed `CTransforms` returns `NULL`, otherwise returns the element at the index of the +/// specified row. See also [`CTransforms`] above. +/// +/// # Safety +/// +/// The engine is responsible for providing a valid [`CTransforms`] pointer, and for checking if the +/// return value is `NULL` or not. +pub unsafe extern "C" fn get_transform_for_row( + row: usize, + transforms: &CTransforms, +) -> Option> { + transforms + .transforms + .get(row) + .cloned() + .flatten() + .map(Into::into) +} + /// Get a selection vector out of a [`DvInfo`] struct /// /// # Safety @@ -369,9 +424,10 @@ fn rust_callback( size: i64, kernel_stats: Option, dv_info: DvInfo, - _transform: Option, + transform: Option, partition_values: HashMap, ) { + let transform = transform.map(|e| e.as_ref().clone()); let partition_map = CStringMap { values: partition_values, }; @@ -384,6 +440,7 @@ fn rust_callback( size, stats.as_ref(), &dv_info, + transform.as_ref(), &partition_map, ); } diff --git a/ffi/tests/read-table-testing/expected-data/basic-partitioned.expected b/ffi/tests/read-table-testing/expected-data/basic-partitioned.expected index 4a062b104..324ef0086 100644 --- a/ffi/tests/read-table-testing/expected-data/basic-partitioned.expected +++ b/ffi/tests/read-table-testing/expected-data/basic-partitioned.expected @@ -6,6 +6,14 @@ Schema: ├─ number: long └─ a_float: double +letter: [ + "a", + "e", + "f", + "a", + "b", + "c" +] number: [ 4, 5, @@ -22,11 +30,3 @@ a_float: [ 2.2, 3.3 ] -letter: [ - "a", - "e", - "f", - "a", - "b", - "c" -] diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index b7a845171..f830d7249 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -22,6 +22,7 @@ use crate::arrow::datatypes::{ }; use crate::arrow::error::ArrowError; use itertools::Itertools; +use tracing::debug; use super::arrow_conversion::LIST_ARRAY_ROOT; use super::arrow_utils::make_arrow_error; @@ -538,6 +539,10 @@ pub struct DefaultExpressionEvaluator { impl ExpressionEvaluator for DefaultExpressionEvaluator { fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult> { + debug!( + "Arrow evaluator evaluating: {:#?}", + self.expression.as_ref() + ); let batch = batch .any_ref() .downcast_ref::()