Skip to content

Commit

Permalink
Part 4: read_table.c uses transform in ffi (#614)
Browse files Browse the repository at this point in the history
Use new transform functionality to transform data over FFI. This lets us get rid of all the gross partition adding code in c :)

In particular: 
- remove `add_partition_columns` in `arrow.c`, we don't need it anymore
- expose ffi methods to get an expression evaluator and evaluate an
expression from `c`
- use the above to add an `apply_transform` function in `arrow.c`

## How was this change tested?

- existing tests
  • Loading branch information
nicklan authored Feb 21, 2025
1 parent e2bcd0b commit baa3fc3
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 106 deletions.
126 changes: 40 additions & 86 deletions ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ArrowContext* init_arrow_context()
context->num_batches = 0;
context->batches = NULL;
context->cur_filter = NULL;
context->cur_transform = NULL;
return context;
}

Expand Down Expand Up @@ -50,86 +51,10 @@ static GArrowRecordBatch* get_record_batch(FFI_ArrowArray* array, GArrowSchema*
return record_batch;
}

// Add columns to a record batch for each partition. In a "real" engine we would want to parse the
// string values into the correct data type. This program just adds all partition columns as strings
// for simplicity
static GArrowRecordBatch* add_partition_columns(
GArrowRecordBatch* record_batch,
PartitionList* partition_cols,
const CStringMap* partition_values)
{
gint64 rows = garrow_record_batch_get_n_rows(record_batch);
gint64 cols = garrow_record_batch_get_n_columns(record_batch);
GArrowRecordBatch* cur_record_batch = record_batch;
GError* error = NULL;
for (uintptr_t i = 0; i < partition_cols->len; i++) {
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
partition_val ? partition_val : "NULL",
pos);
GArrowStringArrayBuilder* builder = garrow_string_array_builder_new();
for (gint64 i = 0; i < rows; i++) {
if (partition_val) {
garrow_string_array_builder_append_string(builder, partition_val, &error);
} else {
garrow_array_builder_append_null((GArrowArrayBuilder*)builder, &error);
}
if (report_g_error("Can't append to partition column builder", error)) {
break;
}
}

if (partition_val) {
free(partition_val);
}

if (error != NULL) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}

GArrowArray* partition_col = garrow_array_builder_finish((GArrowArrayBuilder*)builder, &error);
if (report_g_error("Can't build string array for partition column", error)) {
printf("Giving up on column %s\n", col);
g_error_free(error);
g_object_unref(builder);
error = NULL;
continue;
}
g_object_unref(builder);

GArrowDataType* string_data_type = (GArrowDataType*)garrow_string_data_type_new();
GArrowField* field = garrow_field_new(col, string_data_type);
GArrowRecordBatch* old_batch = cur_record_batch;
cur_record_batch = garrow_record_batch_add_column(old_batch, pos, field, partition_col, &error);
g_object_unref(old_batch);
g_object_unref(partition_col);
g_object_unref(string_data_type);
g_object_unref(field);
if (cur_record_batch == NULL) {
if (error != NULL) {
printf("Could not add column at %u: %s\n", pos, error->message);
g_error_free(error);
}
}
}
return cur_record_batch;
}

// append a batch to our context
static void add_batch_to_context(
ArrowContext* context,
ArrowFFIData* arrow_data,
PartitionList* partition_cols,
const CStringMap* partition_values)
ArrowFFIData* arrow_data)
{
GArrowSchema* schema = get_schema(&arrow_data->schema);
GArrowRecordBatch* record_batch = get_record_batch(&arrow_data->array, schema);
Expand All @@ -142,11 +67,6 @@ static void add_batch_to_context(
g_object_unref(context->cur_filter);
context->cur_filter = NULL;
}
record_batch = add_partition_columns(record_batch, partition_cols, partition_values);
if (record_batch == NULL) {
printf("Failed to add partition columns, not adding batch\n");
return;
}
context->batches = g_list_append(context->batches, record_batch);
context->num_batches++;
print_diag(
Expand Down Expand Up @@ -187,28 +107,61 @@ static GArrowBooleanArray* slice_to_arrow_bool_array(const KernelBoolSlice slice
return (GArrowBooleanArray*)ret;
}

// This will apply the transform in the context to the specified data. This consumes the passed
// ExclusiveEngineData and return a new transformed one
static ExclusiveEngineData* apply_transform(
struct EngineContext* context,
ExclusiveEngineData* data) {
if (!context->arrow_context->cur_transform) {
print_diag(" No transform needed");
return data;
}
print_diag(" Applying transform\n");
SharedExpressionEvaluator* evaluator = get_evaluator(
context->engine,
context->read_schema, // input schema
context->arrow_context->cur_transform,
context->logical_schema); // output schema
ExternResultHandleExclusiveEngineData transformed_res = evaluate(
context->engine,
&data,
evaluator);
free_engine_data(data);
free_evaluator(evaluator);
if (transformed_res.tag != OkHandleExclusiveEngineData) {
print_error("Failed to transform read data.", (Error*)transformed_res.err);
free_error((Error*)transformed_res.err);
return NULL;
}
return transformed_res.ok;
}

// This is the callback that will be called for each chunk of data read from the parquet file
static void visit_read_data(void* vcontext, ExclusiveEngineData* data)
{
print_diag(" Converting read data to arrow\n");
struct EngineContext* context = vcontext;
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(data, context->engine);
ExclusiveEngineData* transformed = apply_transform(context, data);
if (!transformed) {
exit(-1);
}
ExternResultArrowFFIData arrow_res = get_raw_arrow_data(transformed, context->engine);
if (arrow_res.tag != OkArrowFFIData) {
print_error("Failed to get arrow data.", (Error*)arrow_res.err);
free_error((Error*)arrow_res.err);
exit(-1);
}
ArrowFFIData* arrow_data = arrow_res.ok;
add_batch_to_context(
context->arrow_context, arrow_data, context->partition_cols, context->partition_values);
add_batch_to_context(context->arrow_context, arrow_data);
free(arrow_data); // just frees the struct, the data and schema are freed/owned by add_batch_to_context
}

// We call this for each file we get called back to read in read_table.c::visit_callback
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector)
const KernelBoolSlice selection_vector,
const Expression* transform)
{
int full_len = strlen(context->table_root) + path.len + 1;
char* full_path = malloc(sizeof(char) * full_len);
Expand All @@ -233,6 +186,7 @@ void c_read_parquet_file(
}
context->arrow_context->cur_filter = sel_array;
}
context->arrow_context->cur_transform = transform;
ExclusiveFileReadResultIterator* read_iter = read_res.ok;
for (;;) {
ExternResultbool ok_res = read_result_next(read_iter, context, visit_read_data);
Expand Down
4 changes: 3 additions & 1 deletion ffi/examples/read-table/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ typedef struct ArrowContext
gsize num_batches;
GList* batches;
GArrowBooleanArray* cur_filter;
const Expression* cur_transform;
} ArrowContext;

ArrowContext* init_arrow_context(void);
void c_read_parquet_file(
struct EngineContext* context,
const KernelStringSlice path,
const KernelBoolSlice selection_vector);
const KernelBoolSlice selection_vector,
const Expression* transform);
void print_arrow_context(ArrowContext* context);
void free_arrow_context(ArrowContext* context);

Expand Down
8 changes: 6 additions & 2 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void scan_row_callback(
int64_t size,
const Stats* stats,
const DvInfo* dv_info,
const Expression* transform,
const CStringMap* partition_values)
{
(void)size; // not using this at the moment
Expand All @@ -76,7 +77,7 @@ void scan_row_callback(
context->partition_values = partition_values;
print_partition_info(context, partition_values);
#ifdef PRINT_ARROW_DATA
c_read_parquet_file(context, path, selection_vector);
c_read_parquet_file(context, path, selection_vector, transform);
#endif
free_bool_slice(selection_vector);
context->partition_values = NULL;
Expand Down Expand Up @@ -273,10 +274,12 @@ int main(int argc, char* argv[])

SharedScan* scan = scan_res.ok;
SharedGlobalScanState* global_state = get_global_scan_state(scan);
SharedSchema* logical_schema = get_global_logical_schema(global_state);
SharedSchema* read_schema = get_global_read_schema(global_state);
PartitionList* partition_cols = get_partition_list(global_state);
struct EngineContext context = {
global_state,
logical_schema,
read_schema,
table_root,
engine,
Expand Down Expand Up @@ -321,7 +324,8 @@ int main(int argc, char* argv[])

free_kernel_scan_data(data_iter);
free_scan(scan);
free_global_read_schema(read_schema);
free_schema(logical_schema);
free_schema(read_schema);
free_global_scan_state(global_state);
free_snapshot(snapshot);
free_engine(engine);
Expand Down
1 change: 1 addition & 0 deletions ffi/examples/read-table/read_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct PartitionList
struct EngineContext
{
SharedGlobalScanState* global_state;
SharedSchema* logical_schema;
SharedSchema* read_schema;
char* table_root;
SharedExternEngine* engine;
Expand Down
115 changes: 113 additions & 2 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
use std::sync::Arc;

use delta_kernel::{schema::Schema, DeltaResult, FileDataReadResultIterator};
use delta_kernel::{
schema::{DataType, Schema, SchemaRef},
DeltaResult, EngineData, Expression, ExpressionEvaluator, FileDataReadResultIterator,
};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -97,7 +100,7 @@ pub unsafe extern "C" fn free_read_result_iter(data: Handle<ExclusiveFileReadRes
/// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta`
#[no_mangle]
pub unsafe extern "C" fn read_parquet_file(
engine: Handle<SharedExternEngine>,
engine: Handle<SharedExternEngine>, // TODO Does this cause a free?
file: &FileMeta,
physical_schema: Handle<SharedSchema>,
) -> ExternResult<Handle<ExclusiveFileReadResultIterator>> {
Expand Down Expand Up @@ -130,3 +133,111 @@ fn read_parquet_file_impl(
});
Ok(res.into())
}

// Expression Eval

#[handle_descriptor(target=dyn ExpressionEvaluator, mutable=false)]
pub struct SharedExpressionEvaluator;

/// Get the evaluator as provided by the passed engines `ExpressionHandler`.
///
/// # Safety
/// Caller is responsible for calling with a valid `Engine`, `Expression`, and `SharedSchema`s
#[no_mangle]
pub unsafe extern "C" fn get_evaluator(
engine: Handle<SharedExternEngine>,
input_schema: Handle<SharedSchema>,
expression: &Expression,
// TODO: Make this a data_type, and give a way for c code to go between schema <-> datatype
output_type: Handle<SharedSchema>,
) -> Handle<SharedExpressionEvaluator> {
let engine = unsafe { engine.clone_as_arc() };
let input_schema = unsafe { input_schema.clone_as_arc() };
let output_type: DataType = output_type.as_ref().clone().into();
get_evaluator_impl(engine, input_schema, expression, output_type)
}

fn get_evaluator_impl(
extern_engine: Arc<dyn ExternEngine>,
input_schema: SchemaRef,
expression: &Expression,
output_type: DataType,
) -> Handle<SharedExpressionEvaluator> {
let engine = extern_engine.engine();
let evaluator = engine.get_expression_handler().get_evaluator(
input_schema,
expression.clone(),
output_type,
);
evaluator.into()
}

/// Free an evaluator
/// # Safety
///
/// Caller is responsible for passing a valid handle.
#[no_mangle]
pub unsafe extern "C" fn free_evaluator(evaluator: Handle<SharedExpressionEvaluator>) {
debug!("engine released evaluator");
evaluator.drop_handle();
}

/// Use the passed `evaluator` to evaluate its expression against the passed `batch` data.
///
/// # Safety
/// Caller is responsible for calling with a valid `Engine`, `ExclusiveEngineData`, and `Evaluator`
#[no_mangle]
pub unsafe extern "C" fn evaluate(
engine: Handle<SharedExternEngine>,
batch: &mut Handle<ExclusiveEngineData>,
evaluator: Handle<SharedExpressionEvaluator>,
) -> ExternResult<Handle<ExclusiveEngineData>> {
let engine = unsafe { engine.clone_as_arc() };
let batch = unsafe { batch.as_mut() };
let evaluator = unsafe { evaluator.clone_as_arc() };
let res = evaluate_impl(batch, evaluator.as_ref());
res.into_extern_result(&engine.as_ref())
}

fn evaluate_impl(
batch: &dyn EngineData,
evaluator: &dyn ExpressionEvaluator,
) -> DeltaResult<Handle<ExclusiveEngineData>> {
evaluator.evaluate(batch).map(Into::into)
}

#[cfg(test)]
mod tests {
use super::{free_evaluator, get_evaluator};
use crate::{free_engine, handle::Handle, scan::SharedSchema, tests::get_default_engine};
use delta_kernel::{
schema::{DataType, StructField, StructType},
Expression,
};
use std::sync::Arc;

#[test]
fn test_get_evaluator() {
let engine = get_default_engine();
let in_schema = Arc::new(StructType::new(vec![StructField::new(
"a",
DataType::LONG,
true,
)]));
let expr = Expression::literal(1);
let output_type: Handle<SharedSchema> = in_schema.clone().into();
let in_schema_handle: Handle<SharedSchema> = in_schema.into();
unsafe {
let evaluator = get_evaluator(
engine.shallow_copy(),
in_schema_handle.shallow_copy(),
&expr,
output_type.shallow_copy(),
);
in_schema_handle.drop_handle();
output_type.drop_handle();
free_engine(engine);
free_evaluator(evaluator);
}
}
}
Loading

0 comments on commit baa3fc3

Please sign in to comment.