From 3ea5fbba7aa43989bce2805a2894991320fe2a3b Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Sat, 1 Feb 2025 15:42:00 -0800 Subject: [PATCH] refactor --- kernel/src/log_segment.rs | 104 +++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 53d17085f..d64acf22e 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -226,7 +226,9 @@ impl LogSegment { .map(|f| f.location.clone()) .collect(); - let checkpoint_stream = if !checkpoint_parts.is_empty() { + let checkpoint_stream = if checkpoint_parts.is_empty() { + Left(None.into_iter()) + } else { Right(Self::create_checkpoint_stream( self, engine, @@ -234,10 +236,9 @@ impl LogSegment { meta_predicate, checkpoint_parts, )?) - } else { - Left(std::iter::empty()) }; - return Ok(commit_stream.chain(checkpoint_stream)); + + Ok(commit_stream.chain(checkpoint_stream)) } fn create_checkpoint_stream( @@ -247,39 +248,38 @@ impl LogSegment { meta_predicate: Option, checkpoint_parts: Vec, ) -> DeltaResult, bool)>> + Send> { + // We checked that checkpoint_parts is not empty before calling this function let is_json_checkpoint = self.checkpoint_parts[0].extension == "json"; - let actions: FileDataReadResultIterator = if is_json_checkpoint { - engine.get_json_handler().read_json_files( - &checkpoint_parts, - checkpoint_read_schema.clone(), - meta_predicate, - )? - } else { - engine.get_parquet_handler().read_parquet_files( - &checkpoint_parts, - checkpoint_read_schema.clone(), - meta_predicate, - )? - }; + let actions = Self::read_checkpoint_files( + engine, + checkpoint_parts, + checkpoint_read_schema.clone(), + meta_predicate, + is_json_checkpoint, + )?; let need_file_actions = checkpoint_read_schema.contains(ADD_NAME); - if need_file_actions { - // if adds were requested, we need sidecars as well! - require!( - checkpoint_read_schema.contains(SIDECAR_NAME), - Error::generic("Checkpoint read schema must contain SIDECAR_NAME") - ); - } + require!( + !need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME), + Error::generic( + "If the checkpoint read schema contains file actions, it must contain the sidecar column" + ) + ); let is_multi_part_checkpoint = self.checkpoint_parts.len() > 1; - - // Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol) - // In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is. - // If the schema contains file actions, we need to read the sidecar files to replace the checkpoint batch. let log_root = self.log_root.clone(); let parquet_handler = engine.get_parquet_handler().clone(); - let checkpoint_stream = if need_file_actions { + + // If the schema being read does not contain add/remove actions (e.g., when replaying for metadata & protocol), + // then sidecar files do not need to be read, so we can directly return the checkpoint batch. + + // If the schema does contain add/remove actions, we need to read the sidecar files for sidecar batches + // to replace the checkpoint batch. + // If the schema contains file actions, we need to read the sidecar files to replace the checkpoint batch. + + // Multi-part checkpoints do not contain sidecar actions, so in those cases, we return the checkpoint batch as is. + let checkpoint_stream = if need_file_actions && !is_multi_part_checkpoint { Left( actions // Flatten the new batches returned. The new batches could be: @@ -287,13 +287,13 @@ impl LogSegment { // - 1 or more sidecar batches referenced in the checkpoint batch by sidecar actions .flat_map(move |batch_result| match batch_result { Ok(checkpoint_batch) => Right( - Self::create_stream_for_checkpoint_batch( - checkpoint_batch, + Self::process_checkpoint_batch( parquet_handler.clone(), log_root.clone(), - is_multi_part_checkpoint, + checkpoint_batch, ) - .map_or_else(|e| Left(std::iter::once(Err(e))), Right), + .map_or_else(|e| Left(std::iter::once(Err(e))), Right) + .map_ok(|batch| (batch, false)), ), Err(e) => Left(std::iter::once(Err(e))), }), @@ -302,24 +302,7 @@ impl LogSegment { Right(actions.map_ok(|batch| (batch, false))) }; - return Ok(checkpoint_stream); - } - - fn create_stream_for_checkpoint_batch( - checkpoint_batch: Box, - handler: Arc, - log_root_ref: Url, - is_multi_part_checkpoint: bool, - ) -> DeltaResult, bool)>> + Send> { - // Multi-part checkpoints do not have sidecars, so we can return the batch as is. - if !is_multi_part_checkpoint { - match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) { - Ok(iterator) => Ok(Right(iterator.map_ok(|batch| (batch, false)))), - Err(e) => Ok(Left(std::iter::once(Err(e)))), - } - } else { - Ok(Left(std::iter::once(Ok((checkpoint_batch, false))))) - } + Ok(checkpoint_stream) } fn process_checkpoint_batch( @@ -355,6 +338,25 @@ impl LogSegment { )?)) } + // Helper function to read checkpoint files based on the file type + fn read_checkpoint_files( + engine: &dyn Engine, + checkpoint_parts: Vec, + schema: SchemaRef, + predicate: Option, + is_json: bool, + ) -> DeltaResult { + if is_json { + engine + .get_json_handler() + .read_json_files(&checkpoint_parts, schema, predicate) + } else { + engine + .get_parquet_handler() + .read_parquet_files(&checkpoint_parts, schema, predicate) + } + } + // Helper function to convert a single sidecar action to a FileMeta fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> Result { let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;