Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 19, 2025
1 parent 3de721f commit 73ca09c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
13 changes: 3 additions & 10 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl RowVisitor for CdcVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Add action
// Since path column is required, use it to detect presence of an Cdc action
if let Some(path) = getters[0].get_opt(i, "cdc.path")? {
self.cdcs.push(Self::visit_cdc(i, path, getters)?);
}
Expand Down Expand Up @@ -474,15 +474,8 @@ impl RowVisitor for SidecarVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
let opt_path: Option<String> = getters[0].get_opt(i, "sidecar.path")?;
if let Some(path) = opt_path {
// TODO: Known issue here https://github.com/apache/arrow-rs/issues/7119
// Once https://github.com/delta-io/delta-kernel-rs/pull/692 is merged, remove empty path check.
// This is a workaround to avoid constructing a sidecar action with an empty path for testing.
if path.is_empty() {
continue;
}
// Since path column is required, use it to detect presence of an Sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Expand Down
18 changes: 11 additions & 7 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl LogSegment {
.map(|f| f.location.clone())
.collect();

let parquet_handler = engine.get_parquet_handler();

// Historically, we had a shared file reader trait for JSON and Parquet handlers,
// but it was removed to avoid unnecessary coupling. This is a concrete case
// where it *could* have been useful, but for now, we're keeping them separate.
// If similar patterns start appearing elsewhere, we should reconsider that decision.
let actions = if checkpoint_parts
.first()
.is_some_and(|p| p.extension == "json")
Expand All @@ -269,14 +275,13 @@ impl LogSegment {
meta_predicate,
)?
} else {
engine.get_parquet_handler().read_parquet_files(
parquet_handler.read_parquet_files(
&checkpoint_file_meta,
checkpoint_read_schema,
meta_predicate,
)?
};

let parquet_handler = engine.get_parquet_handler();
let actions_iter = actions
.map(move |batch_result| -> DeltaResult<_> {
let checkpoint_batch = batch_result?;
Expand All @@ -286,14 +291,14 @@ impl LogSegment {
// 1. In the case where the schema does not contain add/remove actions, we return the checkpoint
// batch directly as sidecar files only have to be read when the schema contains add/remove actions.
// 2. Multi-part checkpoint batches never have sidecar actions, so the batch is returned as-is.
let sidecar_content = if !need_add_actions || checkpoint_parts.len() > 1 {
None
} else {
let sidecar_content = if need_add_actions && checkpoint_parts.len() == 1 {
Self::process_sidecars(
parquet_handler.clone(), // cheap Arc clone
log_root.clone(),
checkpoint_batch.as_ref(),
)?
} else {
None
};

let combined_batches = std::iter::once(Ok(checkpoint_batch))
Expand All @@ -303,8 +308,7 @@ impl LogSegment {
Ok(combined_batches)
})
.flatten_ok()
// Map converts Result<Result<Box<dyn EngineData>, _>,_> to Result<Box<dyn EngineData>, _>
.map(|result| result?);
.map(|result| result?); // result-result to result

Ok(actions_iter)
}
Expand Down

0 comments on commit 73ca09c

Please sign in to comment.