From 6ac530d203e3e723b14893346caddf06507356e6 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Wed, 5 Feb 2025 14:45:42 -0800 Subject: [PATCH] assert batches returned in replay test --- kernel/src/log_segment/tests.rs | 47 +++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3156527d8..6746a71df 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -4,14 +4,15 @@ use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use url::Url; -use crate::actions::{get_log_schema, METADATA_NAME, PROTOCOL_NAME}; +use crate::actions::{get_log_add_schema, METADATA_NAME, PROTOCOL_NAME}; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; +use crate::scan::test_utils::add_batch_simple; use crate::snapshot::CheckpointMetadata; use crate::utils::test_utils::{MockEngine, MockJsonHandler, MockParquetHandler}; -use crate::{Engine, Expression, FileMeta, FileSystemClient, Table}; +use crate::{Engine, EngineData, Expression, FileMeta, FileSystemClient, Table}; use test_utils::delta_path_for_version; #[test] @@ -38,7 +39,6 @@ fn test_log_replay() { // Define checkpoint and commit files let checkpoint_meta = create_file_meta("file:///00000000000000000001.checkpoint1.json"); let checkpoint_parts = vec![checkpoint_meta.to_parsed_log_path()]; - let commit_files = [ "file:///00000000000000000001.version1.json", "file:///00000000000000000002.version2.json", @@ -47,9 +47,10 @@ fn test_log_replay() { .map(|path| create_file_meta(path).to_parsed_log_path()) .collect::>(); + // Define log segment let log_segment = LogSegment { end_version: 0, - log_root: Url::parse("file:///checkpoint.json").expect("Invalid log root URL"), + log_root: Url::parse("file:///root.json").expect("Invalid log root URL"), ascending_commit_files: commit_files.clone(), checkpoint_parts, }; @@ -68,34 +69,46 @@ fn test_log_replay() { Expression::column([METADATA_NAME, "id"]).is_not_null(), Expression::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(), ))); - let commit_schema = get_log_schema() - .project(&[METADATA_NAME]) - .expect("Failed to project schema"); - let checkpoint_schema = get_log_schema() - .project(&[PROTOCOL_NAME]) - .expect("Failed to project schema"); + let commit_schema = get_log_add_schema().clone(); + let checkpoint_schema = get_log_add_schema().clone(); // Expect the JSON and Parquet handlers to receive the correct files, schemas, and predicates mock_json_handler.expect_read_json_files( expected_commit_files_read.clone(), commit_schema.clone(), predicate.clone(), - Ok(Box::new(std::iter::empty())), + Ok(Box::new(std::iter::once(Ok( + add_batch_simple() as Box + )))), ); - mock_parquet_handler.expect_read_parquet_files( vec![checkpoint_meta], checkpoint_schema.clone(), predicate.clone(), - Ok(Box::new(std::iter::empty())), + Ok(Box::new(std::iter::once(Ok( + add_batch_simple() as Box + )))), ); // Run the log replay - let result = log_segment.replay(&engine, commit_schema, checkpoint_schema, predicate); + let mut iter = log_segment + .replay(&engine, commit_schema, checkpoint_schema, predicate) + .unwrap() + .into_iter(); + + // Verify the commit batch and checkpoint batch are chained together + let first = iter.next().unwrap(); + let (_, is_log_batch1) = first.unwrap(); + assert!(is_log_batch1, "First element should be a commit batch"); + + let second = iter.next().unwrap(); + let (_, is_log_batch2) = second.unwrap(); + assert!( + !is_log_batch2, + "Second element should be a checkpoint batch" + ); - // Verify expected results - assert!(result.is_ok()); - assert!(result.unwrap().next().is_none()); + assert!(iter.next().is_none(), "Iterator should only have 2 batches"); } // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies