Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 1, 2025
1 parent 260ba29 commit 3ea5fbb
Showing 1 changed file with 53 additions and 51 deletions.
104 changes: 53 additions & 51 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,19 @@ impl LogSegment {
.map(|f| f.location.clone())
.collect();

let checkpoint_stream = if !checkpoint_parts.is_empty() {
let checkpoint_stream = if checkpoint_parts.is_empty() {
Left(None.into_iter())
} else {
Right(Self::create_checkpoint_stream(
self,
engine,
checkpoint_read_schema,
meta_predicate,
checkpoint_parts,
)?)
} else {
Left(std::iter::empty())
};
return Ok(commit_stream.chain(checkpoint_stream));

Ok(commit_stream.chain(checkpoint_stream))
}

fn create_checkpoint_stream(
Expand All @@ -247,53 +248,52 @@ impl LogSegment {
meta_predicate: Option<ExpressionRef>,
checkpoint_parts: Vec<FileMeta>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// We checked that checkpoint_parts is not empty before calling this function
let is_json_checkpoint = self.checkpoint_parts[0].extension == "json";

let actions: FileDataReadResultIterator = if is_json_checkpoint {
engine.get_json_handler().read_json_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
} else {
engine.get_parquet_handler().read_parquet_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
};
let actions = Self::read_checkpoint_files(
engine,
checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
is_json_checkpoint,
)?;

let need_file_actions = checkpoint_read_schema.contains(ADD_NAME);
if need_file_actions {
// if adds were requested, we need sidecars as well!
require!(
checkpoint_read_schema.contains(SIDECAR_NAME),
Error::generic("Checkpoint read schema must contain SIDECAR_NAME")
);
}
require!(
!need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
Error::generic(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);

let is_multi_part_checkpoint = self.checkpoint_parts.len() > 1;

// Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol)
// In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is.
// If the schema contains file actions, we need to read the sidecar files to replace the checkpoint batch.
let log_root = self.log_root.clone();
let parquet_handler = engine.get_parquet_handler().clone();
let checkpoint_stream = if need_file_actions {

// If the schema being read does not contain add/remove actions (e.g., when replaying for metadata & protocol),
// then sidecar files do not need to be read, so we can directly return the checkpoint batch.

// If the schema does contain add/remove actions, we need to read the sidecar files for sidecar batches
// to replace the checkpoint batch.
// If the schema contains file actions, we need to read the sidecar files to replace the checkpoint batch.

// Multi-part checkpoints do not contain sidecar actions, so in those cases, we return the checkpoint batch as is.
let checkpoint_stream = if need_file_actions && !is_multi_part_checkpoint {
Left(
actions
// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present in the batch
// - 1 or more sidecar batches referenced in the checkpoint batch by sidecar actions
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Right(
Self::create_stream_for_checkpoint_batch(
checkpoint_batch,
Self::process_checkpoint_batch(
parquet_handler.clone(),
log_root.clone(),
is_multi_part_checkpoint,
checkpoint_batch,
)
.map_or_else(|e| Left(std::iter::once(Err(e))), Right),
.map_or_else(|e| Left(std::iter::once(Err(e))), Right)
.map_ok(|batch| (batch, false)),
),
Err(e) => Left(std::iter::once(Err(e))),
}),
Expand All @@ -302,24 +302,7 @@ impl LogSegment {
Right(actions.map_ok(|batch| (batch, false)))
};

return Ok(checkpoint_stream);
}

fn create_stream_for_checkpoint_batch(
checkpoint_batch: Box<dyn EngineData>,
handler: Arc<dyn ParquetHandler>,
log_root_ref: Url,
is_multi_part_checkpoint: bool,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// Multi-part checkpoints do not have sidecars, so we can return the batch as is.
if !is_multi_part_checkpoint {
match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) {
Ok(iterator) => Ok(Right(iterator.map_ok(|batch| (batch, false)))),
Err(e) => Ok(Left(std::iter::once(Err(e)))),
}
} else {
Ok(Left(std::iter::once(Ok((checkpoint_batch, false)))))
}
Ok(checkpoint_stream)
}

fn process_checkpoint_batch(
Expand Down Expand Up @@ -355,6 +338,25 @@ impl LogSegment {
)?))
}

// Helper function to read checkpoint files based on the file type
fn read_checkpoint_files(
engine: &dyn Engine,
checkpoint_parts: Vec<FileMeta>,
schema: SchemaRef,
predicate: Option<ExpressionRef>,
is_json: bool,
) -> DeltaResult<FileDataReadResultIterator> {
if is_json {
engine
.get_json_handler()
.read_json_files(&checkpoint_parts, schema, predicate)
} else {
engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, schema, predicate)
}
}

// Helper function to convert a single sidecar action to a FileMeta
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> Result<FileMeta, Error> {
let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
Expand Down

0 comments on commit 3ea5fbb

Please sign in to comment.