Skip to content

Commit

Permalink
feat: make ExpressionHandler::get_evaluator fallible
Browse files Browse the repository at this point in the history
Make get_evaluator() return DeltaResult<Arc<dyn ExpressionEvaluator>> to properly handle potential errors. Update all call sites to handle the Result type and simplify error handling using the ok()? operator where appropriate.
  • Loading branch information
devin-ai-integration[bot] committed Dec 7, 2024
1 parent 3b456e4 commit c6025aa
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 35 deletions.
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,12 +519,12 @@ impl ExpressionHandler for ArrowExpressionHandler {
schema: SchemaRef,
expression: Expression,
output_type: DataType,
) -> Arc<dyn ExpressionEvaluator> {
Arc::new(DefaultExpressionEvaluator {
) -> DeltaResult<Arc<dyn ExpressionEvaluator>> {
Ok(Arc::new(DefaultExpressionEvaluator {
input_schema: schema,
expression: Box::new(expression),
output_type,
})
}))
}
}

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub trait ExpressionHandler: AsAny {
schema: SchemaRef,
expression: Expression,
output_type: DataType,
) -> Arc<dyn ExpressionEvaluator>;
) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
}

/// Provides file system related functionalities to Delta Kernel.
Expand Down
37 changes: 21 additions & 16 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,29 @@ impl DataSkippingFilter {
//
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
);
let select_stats_evaluator = engine
.get_expression_handler()
.get_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
)
.ok()?;

let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(predicate, false)?]),
PREDICATE_SCHEMA.clone(),
);
let skipping_evaluator = engine
.get_expression_handler()
.get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(predicate, false)?]),
PREDICATE_SCHEMA.clone(),
)
.ok()?;

let filter_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
FILTER_EXPR.clone(),
DataType::BOOLEAN,
);
let filter_evaluator = engine
.get_expression_handler()
.get_evaluator(stats_schema.clone(), FILTER_EXPR.clone(), DataType::BOOLEAN)
.ok()?;

Some(Self {
stats_schema,
Expand Down
29 changes: 19 additions & 10 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::clone::Clone;
use std::collections::HashSet;
use std::iter;
use std::sync::{Arc, LazyLock};

use tracing::debug;
Expand Down Expand Up @@ -241,22 +242,30 @@ impl LogReplayScanner {
/// indicates whether the record batch is a log or checkpoint batch.
pub fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + 'static,
table_schema: &SchemaRef,
predicate: Option<ExpressionRef>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
) -> Box<dyn Iterator<Item = DeltaResult<ScanData>>> {
let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate);
let add_transform = engine.get_expression_handler().get_evaluator(
match engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
SCAN_ROW_DATATYPE.clone(),
);
action_iter
.map(move |action_res| {
let (batch, is_log_batch) = action_res?;
log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)))
) {
Ok(add_transform) => Box::new(
action_iter
.map(move |action_res| {
let (batch, is_log_batch) = action_res?;
log_scanner.process_scan_batch(
add_transform.as_ref(),
batch.as_ref(),
is_log_batch,
)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))),
),
Err(e) => Box::new(iter::once(Err(e))),
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ fn transform_to_logical_internal(
read_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
)?
.evaluate(data.as_ref())?;
Ok(result)
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl LogReplayScanner {
get_log_add_schema().clone(),
add_transform_expr(),
scan_row_schema().into(),
);
)?;

let result = action_iter.map(move |actions| -> DeltaResult<_> {
let actions = actions?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn generate_adds<'a>(
write_metadata_schema.clone(),
adds_expr,
log_schema.clone().into(),
);
)?;
adds_evaluator.evaluate(write_metadata_batch)
})
}
Expand Down Expand Up @@ -321,7 +321,7 @@ fn generate_commit_info(
engine_commit_info_schema.into(),
commit_info_expr,
commit_info_empty_struct_schema.into(),
);
)?;

commit_info_evaluator.evaluate(engine_commit_info)
}
Expand Down

0 comments on commit c6025aa

Please sign in to comment.