Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 21, 2025
1 parent 626f7b4 commit ff67fcf
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 54 deletions.
40 changes: 22 additions & 18 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME,
PROTOCOL_NAME, SIDECAR_NAME,
PROTOCOL_NAME, REMOVE_NAME, SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
Expand Down Expand Up @@ -218,13 +218,8 @@ impl LogSegment {
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_stream = Self::create_checkpoint_stream(
engine,
checkpoint_read_schema,
meta_predicate,
self.checkpoint_parts.clone(),
self.log_root.clone(),
)?;
let checkpoint_stream =
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;

Ok(commit_stream.chain(checkpoint_stream))
}
Expand All @@ -241,21 +236,22 @@ impl LogSegment {
/// stored directly in the checkpoint. The sidecar file batches are chained to the
/// checkpoint batch in the top level iterator to be returned.
fn create_checkpoint_stream(
&self,
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let need_add_actions = checkpoint_read_schema.contains(ADD_NAME);
let need_file_actions = checkpoint_read_schema.contains(ADD_NAME)
| checkpoint_read_schema.contains(REMOVE_NAME);
require!(
!need_add_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
!need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
Error::invalid_checkpoint(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);

let checkpoint_file_meta: Vec<FileMeta> = checkpoint_parts
let checkpoint_file_meta: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
Expand All @@ -266,7 +262,8 @@ impl LogSegment {
// but it was removed to avoid unnecessary coupling. This is a concrete case
// where it *could* have been useful, but for now, we're keeping them separate.
// If similar patterns start appearing elsewhere, we should reconsider that decision.
let actions = if checkpoint_parts
let actions = if self
.checkpoint_parts
.first()
.is_some_and(|p| p.extension == "json")
{
Expand All @@ -282,17 +279,18 @@ impl LogSegment {
meta_predicate,
)?
};
let log_root = self.log_root.clone();

let actions_iter = actions
.map(move |batch_result| -> DeltaResult<_> {
let checkpoint_batch = batch_result?;
.map(move |checkpoint_batch_result| -> DeltaResult<_> {
let checkpoint_batch = checkpoint_batch_result?;
// This closure maps the checkpoint batch to an iterator of batches
// by chaining the checkpoint batch with sidecar batches if they exist.

// 1. In the case where the schema does not contain the add action, we return the checkpoint
// batch directly as sidecar files only have to be read when the schema contains the add action.
// 2. Multi-part checkpoint batches never have sidecar actions, so the batch is returned as-is.
let sidecar_content = if need_add_actions && checkpoint_parts.len() == 1 {
let sidecar_content = if need_file_actions && checkpoint_file_meta.len() == 1 {
Self::process_sidecars(
parquet_handler.clone(), // cheap Arc clone
log_root.clone(),
Expand All @@ -304,6 +302,7 @@ impl LogSegment {

let combined_batches = std::iter::once(Ok(checkpoint_batch))
.chain(sidecar_content.into_iter().flatten())
// The boolean flag indicates whether the batch originated from a commit file (true) or a checkpoint file (false).
.map_ok(|sidecar_batch| (sidecar_batch, false));

Ok(combined_batches)
Expand Down Expand Up @@ -354,7 +353,12 @@ impl LogSegment {
Ok(FileMeta {
location: log_root.join("_sidecars/")?.join(&sidecar.path)?,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
size: sidecar.size_in_bytes.try_into().map_err(|_| {
Error::generic(format!(
"Failed to convert sidecar size {} to usize",
sidecar.size_in_bytes
))
})?,
})
}

Expand Down
83 changes: 47 additions & 36 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,21 +856,26 @@ fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<(
}

#[test]
fn test_create_checkpoint_stream_returns_none_if_checkpoint_parts_is_empty() -> DeltaResult<()> {
fn test_create_checkpoint_stream_errors_when_schema_has_remove_but_no_sidecar_action(
) -> DeltaResult<()> {
let engine = SyncEngine::new();
let log_root = Url::parse("s3://example-bucket/logs/")?;

let mut iter = LogSegment::create_checkpoint_stream(
// Create the stream over checkpoint batches.
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path("file:///00000000000000000001.parquet")],
log_root,
None,
)?;
let result = log_segment.create_checkpoint_stream(
&engine,
get_log_schema().clone(),
get_log_schema().project(&[REMOVE_NAME])?,
None,
vec![],
Url::parse("s3://example-bucket/logs/")?,
)
.into_iter()
.flatten();
);

// Assert no batches are returned
assert!(iter.next().is_none());
// Errors because the schema has an REMOVE action but no SIDECAR action.
assert!(result.is_err());

Ok(())
}
Expand All @@ -879,15 +884,16 @@ fn test_create_checkpoint_stream_returns_none_if_checkpoint_parts_is_empty() ->
fn test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_action(
) -> DeltaResult<()> {
let engine = SyncEngine::new();
let log_root = Url::parse("s3://example-bucket/logs/")?;

// Create the stream over checkpoint batches.
let result = LogSegment::create_checkpoint_stream(
&engine,
get_log_add_schema().clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path("file:///00000000000000000001.parquet")],
Url::parse("s3://example-bucket/logs/")?,
);
log_root,
None,
)?;
let result = log_segment.create_checkpoint_stream(&engine, get_log_add_schema().clone(), None);

// Errors because the schema has an ADD action but no SIDECAR action.
assert!(result.is_err());
Expand Down Expand Up @@ -917,13 +923,14 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_

let v2_checkpoint_read_schema = get_log_schema().project(&[METADATA_NAME])?;

let mut iter = LogSegment::create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path(&checkpoint_one_file)],
log_root,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;

// Assert that the first batch returned is from reading checkpoint file 1
let (first_batch, is_log_batch) = iter.next().unwrap()?;
Expand Down Expand Up @@ -969,16 +976,17 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_mul

let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?;

let mut iter = LogSegment::create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![
create_log_path(&checkpoint_one_file),
create_log_path(&checkpoint_two_file),
],
log_root,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;

// Assert the correctness of batches returned
for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() {
Expand Down Expand Up @@ -1019,13 +1027,14 @@ fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars

let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?;

let mut iter = LogSegment::create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path(&checkpoint_one_file)],
log_root,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;

// Assert that the first batch returned is from reading checkpoint file 1
let (first_batch, is_log_batch) = iter.next().unwrap()?;
Expand Down Expand Up @@ -1064,13 +1073,14 @@ fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars()

let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?;

let mut iter = LogSegment::create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path(&checkpoint_one_file)],
log_root,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;

// Assert that the first batch returned is from reading checkpoint file 1
let (first_batch, is_log_batch) = iter.next().unwrap()?;
Expand Down Expand Up @@ -1127,13 +1137,14 @@ fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batch

let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?;

let mut iter = LogSegment::create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
let log_segment = LogSegment::try_new(
vec![],
vec![create_log_path(&checkpoint_file_path)],
log_root,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;

// Assert that the first batch returned is from reading checkpoint file 1
let (first_batch, is_log_batch) = iter.next().unwrap()?;
Expand Down

0 comments on commit ff67fcf

Please sign in to comment.