Skip to content

Commit

Permalink
assert batches returned in replay test
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 5, 2025
1 parent 1f8f140 commit 84e6010
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use url::Url;

use crate::actions::{get_log_schema, METADATA_NAME, PROTOCOL_NAME};
use crate::actions::{get_log_add_schema, METADATA_NAME, PROTOCOL_NAME};
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::test_utils::add_batch_simple;
use crate::snapshot::CheckpointMetadata;
use crate::utils::test_utils::{MockEngine, MockJsonHandler, MockParquetHandler};
use crate::{Engine, Expression, FileMeta, FileSystemClient, Table};
use crate::{Engine, EngineData, Expression, FileMeta, FileSystemClient, Table};
use test_utils::delta_path_for_version;

#[test]
Expand All @@ -38,7 +39,6 @@ fn test_log_replay() {
// Define checkpoint and commit files
let checkpoint_meta = create_file_meta("file:///00000000000000000001.checkpoint1.json");
let checkpoint_parts = vec![checkpoint_meta.to_parsed_log_path()];

let commit_files = [
"file:///00000000000000000001.version1.json",
"file:///00000000000000000002.version2.json",
Expand All @@ -47,9 +47,10 @@ fn test_log_replay() {
.map(|path| create_file_meta(path).to_parsed_log_path())
.collect::<Vec<_>>();

// Define log segment
let log_segment = LogSegment {
end_version: 0,
log_root: Url::parse("file:///checkpoint.json").expect("Invalid log root URL"),
log_root: Url::parse("file:///root.json").expect("Invalid log root URL"),
ascending_commit_files: commit_files.clone(),
checkpoint_parts,
};
Expand All @@ -68,34 +69,46 @@ fn test_log_replay() {
Expression::column([METADATA_NAME, "id"]).is_not_null(),
Expression::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(),
)));
let commit_schema = get_log_schema()
.project(&[METADATA_NAME])
.expect("Failed to project schema");
let checkpoint_schema = get_log_schema()
.project(&[PROTOCOL_NAME])
.expect("Failed to project schema");
let commit_schema = get_log_add_schema().clone();
let checkpoint_schema = get_log_add_schema().clone();

// Expect the JSON and Parquet handlers to receive the correct files, schemas, and predicates
mock_json_handler.expect_read_json_files(
expected_commit_files_read.clone(),
commit_schema.clone(),
predicate.clone(),
Ok(Box::new(std::iter::empty())),
Ok(Box::new(std::iter::once(Ok(
add_batch_simple() as Box<dyn EngineData>
)))),
);

mock_parquet_handler.expect_read_parquet_files(
vec![checkpoint_meta],
checkpoint_schema.clone(),
predicate.clone(),
Ok(Box::new(std::iter::empty())),
Ok(Box::new(std::iter::once(Ok(
add_batch_simple() as Box<dyn EngineData>
)))),
);

// Run the log replay
let result = log_segment.replay(&engine, commit_schema, checkpoint_schema, predicate);
let mut iter = log_segment
.replay(&engine, commit_schema, checkpoint_schema, predicate)
.unwrap()
.into_iter();

// Verify the commit batch and checkpoint batch are chained together
let first = iter.next().unwrap();
let (_, is_log_batch1) = first.unwrap();
assert!(is_log_batch1, "First element should be a commit batch");

let second = iter.next().unwrap();
let (_, is_log_batch2) = second.unwrap();
assert!(
!is_log_batch2,
"Second element should be a checkpoint batch"
);

// Verify expected results
assert!(result.is_ok());
assert!(result.unwrap().next().is_none());
assert!(iter.next().is_none(), "Iterator should only have 2 batches");
}

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
Expand Down

0 comments on commit 84e6010

Please sign in to comment.