diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index cb0a9c9cd..91ede4f32 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -524,7 +524,6 @@ pub struct SetTransaction { /// file actions. This action is only allowed in checkpoints following the V2 spec. /// /// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information -#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing #[derive(Schema, Debug, PartialEq)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) struct Sidecar { diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 72747ac6a..36a2c7faf 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -352,7 +352,7 @@ impl RowVisitor for CdcVisitor { )) ); for i in 0..row_count { - // Since path column is required, use it to detect presence of an Add action + // Since path column is required, use it to detect presence of a Cdc action if let Some(path) = getters[0].get_opt(i, "cdc.path")? { self.cdcs.push(Self::visit_cdc(i, path, getters)?); } @@ -438,7 +438,6 @@ impl RowVisitor for SetTransactionVisitor { } } -#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing #[derive(Default)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) struct SidecarVisitor { @@ -475,7 +474,7 @@ impl RowVisitor for SidecarVisitor { )) ); for i in 0..row_count { - // Since path column is required, use it to detect presence of a sidecar action + // Since path column is required, use it to detect presence of a Sidecar action if let Some(path) = getters[0].get_opt(i, "sidecar.path")? { self.sidecars.push(Self::visit_sidecar(i, path, getters)?); } diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index b4f255c57..dfdad5209 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -1,13 +1,18 @@ //! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit //! files. -use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; +use crate::actions::visitors::SidecarVisitor; +use crate::actions::{ + get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME, + PROTOCOL_NAME, REMOVE_NAME, SIDECAR_NAME, +}; use crate::path::{LogPathFileType, ParsedLogPath}; use crate::schema::SchemaRef; use crate::snapshot::CheckpointMetadata; use crate::utils::require; use crate::{ - DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, + DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient, + ParquetHandler, RowVisitor, Version, }; use itertools::Itertools; use std::collections::HashMap; @@ -213,17 +218,148 @@ impl LogSegment { .read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())? .map_ok(|batch| (batch, true)); - let checkpoint_parts: Vec<_> = self + let checkpoint_stream = + self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; + + Ok(commit_stream.chain(checkpoint_stream)) + } + + /// Returns an iterator over checkpoint data, processing sidecar files when necessary. + /// + /// By default, `create_checkpoint_stream` checks for the presence of sidecar files, and + /// reads their contents if present. Checking for sidecar files is skipped if: + /// - The checkpoint is a multi-part checkpoint + /// - The checkpoint read schema does not contain the add action + /// + /// For single-part checkpoints, any referenced sidecar files are processed. These + /// sidecar files contain the actual add actions that would otherwise be + /// stored directly in the checkpoint. The sidecar file batches are chained to the + /// checkpoint batch in the top level iterator to be returned. + fn create_checkpoint_stream( + &self, + engine: &dyn Engine, + checkpoint_read_schema: SchemaRef, + meta_predicate: Option, + ) -> DeltaResult, bool)>> + Send> { + let need_file_actions = checkpoint_read_schema.contains(ADD_NAME) + | checkpoint_read_schema.contains(REMOVE_NAME); + require!( + !need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME), + Error::invalid_checkpoint( + "If the checkpoint read schema contains file actions, it must contain the sidecar column" + ) + ); + + let checkpoint_file_meta: Vec<_> = self .checkpoint_parts .iter() .map(|f| f.location.clone()) .collect(); - let checkpoint_stream = engine - .get_parquet_handler() - .read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)? - .map_ok(|batch| (batch, false)); - Ok(commit_stream.chain(checkpoint_stream)) + let parquet_handler = engine.get_parquet_handler(); + + // Historically, we had a shared file reader trait for JSON and Parquet handlers, + // but it was removed to avoid unnecessary coupling. This is a concrete case + // where it *could* have been useful, but for now, we're keeping them separate. + // If similar patterns start appearing elsewhere, we should reconsider that decision. + let actions = if self + .checkpoint_parts + .first() + .is_some_and(|p| p.extension == "json") + { + engine.get_json_handler().read_json_files( + &checkpoint_file_meta, + checkpoint_read_schema, + meta_predicate, + )? + } else { + parquet_handler.read_parquet_files( + &checkpoint_file_meta, + checkpoint_read_schema, + meta_predicate, + )? + }; + let log_root = self.log_root.clone(); + + let actions_iter = actions + .map(move |checkpoint_batch_result| -> DeltaResult<_> { + let checkpoint_batch = checkpoint_batch_result?; + // This closure maps the checkpoint batch to an iterator of batches + // by chaining the checkpoint batch with sidecar batches if they exist. + + // 1. In the case where the schema does not contain the add action, we return the checkpoint + // batch directly as sidecar files only have to be read when the schema contains the add action. + // 2. Multi-part checkpoint batches never have sidecar actions, so the batch is returned as-is. + let sidecar_content = if need_file_actions && checkpoint_file_meta.len() == 1 { + Self::process_sidecars( + parquet_handler.clone(), // cheap Arc clone + log_root.clone(), + checkpoint_batch.as_ref(), + )? + } else { + None + }; + + let combined_batches = std::iter::once(Ok(checkpoint_batch)) + .chain(sidecar_content.into_iter().flatten()) + // The boolean flag indicates whether the batch originated from a commit file (true) or a checkpoint file (false). + .map_ok(|sidecar_batch| (sidecar_batch, false)); + + Ok(combined_batches) + }) + .flatten_ok() + .map(|result| result?); // result-result to result + + Ok(actions_iter) + } + + /// Processes sidecar files for the given checkpoint batch. + /// + /// This function extracts any sidecar file references from the provided batch. + /// Each sidecar file is read and an iterator of sidecar file batches is returned + fn process_sidecars( + parquet_handler: Arc, + log_root: Url, + batch: &dyn EngineData, + ) -> DeltaResult>> + Send>> { + // Visit the rows of the checkpoint batch to extract sidecar file references + let mut visitor = SidecarVisitor::default(); + visitor.visit_rows_of(batch)?; + + // If there are no sidecar files, return early + if visitor.sidecars.is_empty() { + return Ok(None); + } + + let sidecar_files: Vec<_> = visitor + .sidecars + .iter() + .map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root)) + .try_collect()?; + + // Read the sidecar files and return an iterator of sidecar file batches + Ok(Some(parquet_handler.read_parquet_files( + &sidecar_files, + get_log_add_schema().clone(), + None, + )?)) + } + + /// Convert a Sidecar record to a FileMeta. + /// + /// This helper first builds the URL by joining the provided log_root with + /// the "_sidecars/" folder and the given sidecar path. + fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> DeltaResult { + Ok(FileMeta { + location: log_root.join("_sidecars/")?.join(&sidecar.path)?, + last_modified: sidecar.modification_time, + size: sidecar.size_in_bytes.try_into().map_err(|_| { + Error::generic(format!( + "Failed to convert sidecar size {} to usize", + sidecar.size_in_bytes + )) + })?, + }) } // Get the most up-to-date Protocol and Metadata actions diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 5db1c4581..4e050944d 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -4,12 +4,25 @@ use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use url::Url; +use crate::actions::visitors::AddVisitor; +use crate::actions::{ + get_log_add_schema, get_log_schema, Add, Sidecar, ADD_NAME, METADATA_NAME, REMOVE_NAME, + SIDECAR_NAME, +}; +use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; +use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; +use crate::parquet::arrow::ArrowWriter; +use crate::path::ParsedLogPath; +use crate::scan::test_utils::{ + add_batch_simple, add_batch_with_remove, sidecar_batch_with_given_paths, +}; use crate::snapshot::CheckpointMetadata; -use crate::{FileSystemClient, Table}; +use crate::utils::test_utils::{assert_batch_matches, Action}; +use crate::{DeltaResult, Engine, EngineData, FileMeta, FileSystemClient, RowVisitor, Table}; use test_utils::delta_path_for_version; // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies @@ -107,6 +120,95 @@ fn build_log_with_paths_and_checkpoint( (Box::new(client), log_root) } +// Create an in-memory store and return the store and the URL for the store's _delta_log directory. +fn new_in_memory_store() -> (Arc, Url) { + ( + Arc::new(InMemory::new()), + Url::parse("memory:///") + .unwrap() + .join("_delta_log/") + .unwrap(), + ) +} + +// Writes a record batch obtained from engine data to the in-memory store at a given path. +fn write_parquet_to_store( + store: &Arc, + path: String, + data: Box, +) -> DeltaResult<()> { + let batch = ArrowEngineData::try_from_engine_data(data)?; + let record_batch = batch.record_batch(); + + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; + writer.write(record_batch)?; + writer.close()?; + + tokio::runtime::Runtime::new() + .expect("create tokio runtime") + .block_on(async { store.put(&Path::from(path), buffer.into()).await })?; + + Ok(()) +} + +/// Writes all actions to a _delta_log parquet checkpoint file in the store. +/// This function formats the provided filename into the _delta_log directory. +fn add_checkpoint_to_store( + store: &Arc, + data: Box, + filename: &str, +) -> DeltaResult<()> { + let path = format!("_delta_log/{}", filename); + write_parquet_to_store(store, path, data) +} + +/// Writes all actions to a _delta_log/_sidecars file in the store. +/// This function formats the provided filename into the _sidecars subdirectory. +fn add_sidecar_to_store( + store: &Arc, + data: Box, + filename: &str, +) -> DeltaResult<()> { + let path = format!("_delta_log/_sidecars/{}", filename); + write_parquet_to_store(store, path, data) +} + +/// Writes all actions to a _delta_log json checkpoint file in the store. +/// This function formats the provided filename into the _delta_log directory. +fn write_json_to_store( + store: &Arc, + actions: Vec, + filename: &str, +) -> DeltaResult<()> { + let json_lines: Vec = actions + .into_iter() + .map(|action| serde_json::to_string(&action).expect("action to string")) + .collect(); + let content = json_lines.join("\n"); + let checkpoint_path = format!("_delta_log/{}", filename); + + tokio::runtime::Runtime::new() + .expect("create tokio runtime") + .block_on(async { + store + .put(&Path::from(checkpoint_path), content.into()) + .await + })?; + + Ok(()) +} + +fn create_log_path(path: &str) -> ParsedLogPath { + ParsedLogPath::try_from(FileMeta { + location: Url::parse(path).expect("Invalid file URL"), + last_modified: 0, + size: 0, + }) + .unwrap() + .unwrap() +} + #[test] fn build_snapshot_with_unsupported_uuid_checkpoint() { let (client, log_root) = build_log_with_paths_and_checkpoint( @@ -123,7 +225,6 @@ fn build_snapshot_with_unsupported_uuid_checkpoint() { ], None, ); - let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap(); let commit_files = log_segment.ascending_commit_files; let checkpoint_parts = log_segment.checkpoint_parts; @@ -620,3 +721,453 @@ fn table_changes_fails_with_larger_start_version_than_end() { let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 1, Some(0)); assert!(log_segment_res.is_err()); } +#[test] +fn test_sidecar_to_filemeta_valid_paths() -> DeltaResult<()> { + let log_root = Url::parse("file:///var/_delta_log/")?; + let test_cases = [ + ( + "example.parquet", + "file:///var/_delta_log/_sidecars/example.parquet", + ), + ( + "file:///var/_delta_log/_sidecars/example.parquet", + "file:///var/_delta_log/_sidecars/example.parquet", + ), + ( + "test/test/example.parquet", + "file:///var/_delta_log/_sidecars/test/test/example.parquet", + ), + ]; + + for (input_path, expected_url) in test_cases.into_iter() { + let sidecar = Sidecar { + path: expected_url.to_string(), + modification_time: 0, + size_in_bytes: 1000, + tags: None, + }; + + let filemeta = LogSegment::sidecar_to_filemeta(&sidecar, &log_root)?; + assert_eq!( + filemeta.location.as_str(), + expected_url, + "Mismatch for input path: {}", + input_path + ); + } + Ok(()) +} + +#[test] +fn test_checkpoint_batch_with_no_sidecars_returns_none() -> DeltaResult<()> { + let (_, log_root) = new_in_memory_store(); + let engine = Arc::new(SyncEngine::new()); + let checkpoint_batch = add_batch_simple(get_log_schema().clone()); + + let mut iter = LogSegment::process_sidecars( + engine.get_parquet_handler(), + log_root, + checkpoint_batch.as_ref(), + )? + .into_iter() + .flatten(); + + // Assert no batches are returned + assert!(iter.next().is_none()); + + Ok(()) +} + +#[test] +fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + add_sidecar_to_store( + &store, + add_batch_simple(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?), + "sidecarfile1.parquet", + )?; + add_sidecar_to_store( + &store, + add_batch_with_remove(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?), + "sidecarfile2.parquet", + )?; + + let checkpoint_batch = sidecar_batch_with_given_paths( + vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], + get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?, + ); + + let mut iter = LogSegment::process_sidecars( + engine.get_parquet_handler(), + log_root, + checkpoint_batch.as_ref(), + )? + .into_iter() + .flatten(); + + // Assert the correctness of batches returned + let sidecar_schema = get_log_add_schema().clone(); + assert_batch_matches( + iter.next().unwrap()?, + add_batch_simple(sidecar_schema.clone()), + ); + assert_batch_matches( + iter.next().unwrap()?, + add_batch_with_remove(sidecar_schema.clone()), + ); + assert!(iter.next().is_none()); + + Ok(()) +} + +#[test] +fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + let checkpoint_batch = sidecar_batch_with_given_paths( + vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], + get_log_schema().clone(), + ); + + let mut iter = LogSegment::process_sidecars( + engine.get_parquet_handler(), + log_root, + checkpoint_batch.as_ref(), + )? + .into_iter() + .flatten(); + + // Assert that an error is returned when trying to read sidecar files that do not exist + let err = iter.next().unwrap(); + assert!(err.is_err()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_errors_when_schema_has_remove_but_no_sidecar_action( +) -> DeltaResult<()> { + let engine = SyncEngine::new(); + let log_root = Url::parse("s3://example-bucket/logs/")?; + + // Create the stream over checkpoint batches. + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path("file:///00000000000000000001.parquet")], + log_root, + None, + )?; + let result = log_segment.create_checkpoint_stream( + &engine, + get_log_schema().project(&[REMOVE_NAME])?, + None, + ); + + // Errors because the schema has an REMOVE action but no SIDECAR action. + assert!(result.is_err()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_action( +) -> DeltaResult<()> { + let engine = SyncEngine::new(); + let log_root = Url::parse("s3://example-bucket/logs/")?; + + // Create the stream over checkpoint batches. + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path("file:///00000000000000000001.parquet")], + log_root, + None, + )?; + let result = log_segment.create_checkpoint_stream(&engine, get_log_add_schema().clone(), None); + + // Errors because the schema has an ADD action but no SIDECAR action. + assert!(result.is_err()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions( +) -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + add_checkpoint_to_store( + &store, + // Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read. + sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_log_schema().clone()), + "00000000000000000001.checkpoint.parquet", + )?; + + let checkpoint_one_file = log_root + .join("00000000000000000001.checkpoint.parquet")? + .to_string(); + + let v2_checkpoint_read_schema = get_log_schema().project(&[METADATA_NAME])?; + + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path(&checkpoint_one_file)], + log_root, + None, + )?; + let mut iter = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + + // Assert that the first batch returned is from reading checkpoint file 1 + let (first_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches( + first_batch, + sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], v2_checkpoint_read_schema.clone()), + ); + assert!(iter.next().is_none()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part( +) -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + // Multi-part checkpoints can never have sidecar actions. + // We place batches with sidecar actions in multi-part checkpoints to verify we do not read the actions, as we + // should instead short-circuit and return the checkpoint batches as-is when encountering multi-part checkpoints. + let checkpoint_part_1 = "00000000000000000001.checkpoint.0000000001.0000000002.parquet"; + let checkpoint_part_2 = "00000000000000000001.checkpoint.0000000002.0000000002.parquet"; + + add_checkpoint_to_store( + &store, + sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_log_schema().clone()), + checkpoint_part_1, + )?; + add_checkpoint_to_store( + &store, + sidecar_batch_with_given_paths(vec!["sidecar2.parquet"], get_log_schema().clone()), + checkpoint_part_2, + )?; + + let checkpoint_one_file = log_root.join(checkpoint_part_1)?.to_string(); + let checkpoint_two_file = log_root.join(checkpoint_part_2)?.to_string(); + + let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?; + + let log_segment = LogSegment::try_new( + vec![], + vec![ + create_log_path(&checkpoint_one_file), + create_log_path(&checkpoint_two_file), + ], + log_root, + None, + )?; + let mut iter = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + + // Assert the correctness of batches returned + for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() { + let (batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches( + batch, + sidecar_batch_with_given_paths( + vec![expected_sidecar], + v2_checkpoint_read_schema.clone(), + ), + ); + } + assert!(iter.next().is_none()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars() -> DeltaResult<()> +{ + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + add_checkpoint_to_store( + &store, + add_batch_simple(get_log_schema().clone()), + "00000000000000000001.checkpoint.parquet", + )?; + + let checkpoint_one_file = log_root + .join("00000000000000000001.checkpoint.parquet")? + .to_string(); + + let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?; + + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path(&checkpoint_one_file)], + log_root, + None, + )?; + let mut iter = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + + // Assert that the first batch returned is from reading checkpoint file 1 + let (first_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches( + first_batch, + add_batch_simple(v2_checkpoint_read_schema.clone()), + ); + assert!(iter.next().is_none()); + + Ok(()) +} + +#[test] +fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + write_json_to_store( + &store, + vec![Action::Add(Add { + path: "fake_path_1".into(), + data_change: true, + ..Default::default() + })], + "00000000000000000001.checkpoint.json", + )?; + + let checkpoint_one_file = log_root + .join("00000000000000000001.checkpoint.json")? + .to_string(); + + let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?; + + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path(&checkpoint_one_file)], + log_root, + None, + )?; + let mut iter = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + + // Assert that the first batch returned is from reading checkpoint file 1 + let (first_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + let mut visitor = AddVisitor::default(); + visitor.visit_rows_of(&*first_batch)?; + assert!(visitor.adds.len() == 1); + assert!(visitor.adds[0].path == "fake_path_1"); + + assert!(iter.next().is_none()); + + Ok(()) +} + +// Tests the end-to-end process of creating a checkpoint stream. +// Verifies that: +// - The checkpoint file is read and produces batches containing references to sidecar files. +// - As sidecar references are present, the corresponding sidecar files are processed correctly. +// - Batches from both the checkpoint file and sidecar files are returned. +// - Each returned batch is correctly flagged with is_log_batch set to false +#[test] +fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches( +) -> DeltaResult<()> { + let (store, log_root) = new_in_memory_store(); + let engine = DefaultEngine::new( + store.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + add_checkpoint_to_store( + &store, + sidecar_batch_with_given_paths( + vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], + get_log_schema().clone(), + ), + "00000000000000000001.checkpoint.parquet", + )?; + + add_sidecar_to_store( + &store, + add_batch_simple(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?), + "sidecarfile1.parquet", + )?; + add_sidecar_to_store( + &store, + add_batch_with_remove(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?), + "sidecarfile2.parquet", + )?; + + let checkpoint_file_path = log_root + .join("00000000000000000001.checkpoint.parquet")? + .to_string(); + + let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?; + + let log_segment = LogSegment::try_new( + vec![], + vec![create_log_path(&checkpoint_file_path)], + log_root, + None, + )?; + let mut iter = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + + // Assert that the first batch returned is from reading checkpoint file 1 + let (first_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches( + first_batch, + sidecar_batch_with_given_paths( + vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], + get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?, + ), + ); + // Assert that the second batch returned is from reading sidecarfile1 + let sidecar_schema = get_log_add_schema(); + let (second_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches(second_batch, add_batch_simple(sidecar_schema.clone())); + + // Assert that the second batch returned is from reading sidecarfile2 + let (third_batch, is_log_batch) = iter.next().unwrap()?; + assert!(!is_log_batch); + assert_batch_matches(third_batch, add_batch_with_remove(sidecar_schema.clone())); + + assert!(iter.next().is_none()); + + Ok(()) +} diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 177996a80..cebce5b6c 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -323,6 +323,7 @@ pub(crate) fn scan_action_iter( mod tests { use std::{collections::HashMap, sync::Arc}; + use crate::actions::get_log_schema; use crate::expressions::{column_name, Scalar}; use crate::scan::state::{DvInfo, Stats}; use crate::scan::test_utils::{ @@ -364,7 +365,7 @@ mod tests { #[test] fn test_scan_action_iter() { run_with_validate_callback( - vec![add_batch_simple()], + vec![add_batch_simple(get_log_schema().clone())], None, // not testing schema None, // not testing transform &[true, false], @@ -376,7 +377,7 @@ mod tests { #[test] fn test_scan_action_iter_with_remove() { run_with_validate_callback( - vec![add_batch_with_remove()], + vec![add_batch_with_remove(get_log_schema().clone())], None, // not testing schema None, // not testing transform &[false, false, true, false], @@ -387,7 +388,7 @@ mod tests { #[test] fn test_no_transforms() { - let batch = vec![add_batch_simple()]; + let batch = vec![add_batch_simple(get_log_schema().clone())]; let logical_schema = Arc::new(crate::schema::StructType::new(vec![])); let iter = scan_action_iter( &SyncEngine::new(), diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 0672345eb..13a15ffb7 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -11,7 +11,7 @@ use url::Url; use crate::actions::deletion_vector::{ deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor, }; -use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; +use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME}; use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar}; use crate::predicates::{DefaultPredicateEvaluator, EmptyColumnResolver}; use crate::scan::state::{DvInfo, Stats}; @@ -428,7 +428,7 @@ impl Scan { engine: &dyn Engine, ) -> DeltaResult, bool)>> + Send> { let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; - let checkpoint_read_schema = get_log_add_schema().clone(); + let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?; // NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping // when ~every checkpoint file will contain the adds and removes we are looking for. @@ -663,10 +663,10 @@ pub fn selection_vector( // some utils that are used in file_stream.rs and state.rs tests #[cfg(test)] pub(crate) mod test_utils { - use std::sync::Arc; - use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use itertools::Itertools; + use std::sync::Arc; use crate::{ actions::get_log_schema, @@ -690,23 +690,54 @@ pub(crate) mod test_utils { Box::new(ArrowEngineData::new(batch)) } - // simple add - pub(crate) fn add_batch_simple() -> Box { + // Generates a batch of sidecar actions with the given paths. + // The schema is provided as null columns affect equality checks. + pub(crate) fn sidecar_batch_with_given_paths( + paths: Vec<&str>, + output_schema: SchemaRef, + ) -> Box { + let handler = SyncJsonHandler {}; + + let mut json_strings: Vec = paths + .iter() + .map(|path| { + format!( + r#"{{"sidecar":{{"path":"{path}","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{{"tag_foo":"tag_bar"}}}}}}"# + ) + }) + .collect(); + json_strings.push(r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#.to_string()); + + let json_strings_array: StringArray = + json_strings.iter().map(|s| s.as_str()).collect_vec().into(); + + let parsed = handler + .parse_json( + string_array_to_engine_data(json_strings_array), + output_schema, + ) + .unwrap(); + + ArrowEngineData::try_from_engine_data(parsed).unwrap() + } + + // Generates a batch with an add action. + // The schema is provided as null columns affect equality checks. + pub(crate) fn add_batch_simple(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; let json_strings: StringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] .into(); - let output_schema = get_log_schema().clone(); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); ArrowEngineData::try_from_engine_data(parsed).unwrap() } - // add batch with a removed file - pub(crate) fn add_batch_with_remove() -> Box { + // An add batch with a removed file parsed with the schema provided + pub(crate) fn add_batch_with_remove(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; let json_strings: StringArray = vec![ r#"{"remove":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","deletionTimestamp":1677811194426,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -715,7 +746,6 @@ pub(crate) mod test_utils { r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] .into(); - let output_schema = get_log_schema().clone(); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 85eb6e4a7..0dfecc4ee 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -243,6 +243,7 @@ impl RowVisitor for ScanFileVisitor<'_, T> { mod tests { use std::collections::HashMap; + use crate::actions::get_log_schema; use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback}; use crate::ExpressionRef; @@ -282,7 +283,7 @@ mod tests { fn test_simple_visit_scan_data() { let context = TestContext { id: 2 }; run_with_validate_callback( - vec![add_batch_simple()], + vec![add_batch_simple(get_log_schema().clone())], None, // not testing schema None, // not testing transform &[true, false], diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 6086a7031..11befb34a 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -286,6 +286,10 @@ impl StructType { self.fields.values() } + pub fn contains(&self, name: impl AsRef) -> bool { + self.fields.contains_key(name.as_ref()) + } + /// Extracts the name and type of all leaf columns, in schema order. Caller should pass Some /// `own_name` if this schema is embedded in a larger struct (e.g. `add.*`) and None if the /// schema is a top-level result (e.g. `*`). diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 8f4fcf818..fd2db2501 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -13,6 +13,7 @@ pub(crate) use require; #[cfg(test)] pub(crate) mod test_utils { + use crate::arrow::array::RecordBatch; use itertools::Itertools; use object_store::local::LocalFileSystem; use object_store::ObjectStore; @@ -21,7 +22,11 @@ pub(crate) mod test_utils { use tempfile::TempDir; use test_utils::delta_path_for_version; - use crate::actions::{Add, Cdc, CommitInfo, Metadata, Protocol, Remove}; + use crate::{ + actions::{Add, Cdc, CommitInfo, Metadata, Protocol, Remove}, + engine::arrow_data::ArrowEngineData, + EngineData, + }; #[derive(Serialize)] pub(crate) enum Action { @@ -73,9 +78,23 @@ pub(crate) mod test_utils { .await .expect("put log file in store"); } + /// Get the path to the root of the table. pub(crate) fn table_root(&self) -> &Path { self.dir.path() } } + + /// Try to convert an `EngineData` into a `RecordBatch`. Panics if not using `ArrowEngineData` from + /// the default module + fn into_record_batch(engine_data: Box) -> RecordBatch { + ArrowEngineData::try_from_engine_data(engine_data) + .unwrap() + .into() + } + + /// Checks that two `EngineData` objects are equal by converting them to `RecordBatch` and comparing + pub(crate) fn assert_batch_matches(actual: Box, expected: Box) { + assert_eq!(into_record_batch(actual), into_record_batch(expected)); + } }