Skip to content

Commit

Permalink
mvp, todo: tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 6, 2025
1 parent 68f4790 commit 00af1f9
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 33 deletions.
8 changes: 4 additions & 4 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,15 +783,15 @@ mod tests {
}

#[test]
fn test_v2_checkpoint_unsupported() {
fn test_v2_checkpoint_supported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
4,
Expand Down Expand Up @@ -821,7 +821,7 @@ mod tests {
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
3,
Expand All @@ -839,7 +839,7 @@ mod tests {
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol {
min_reader_version: 1,
Expand Down
153 changes: 147 additions & 6 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
//! files.
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME,
PROTOCOL_NAME, SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileDataReadResultIterator,
FileMeta, FileSystemClient, ParquetHandler, RowVisitor, Version,
};
use itertools::Either::{Left, Right};
use itertools::Itertools;
use std::collections::HashMap;
use std::convert::identity;
Expand Down Expand Up @@ -218,14 +224,149 @@ impl LogSegment {
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));
let checkpoint_stream = if checkpoint_parts.is_empty() {
Left(None.into_iter())
} else {
Right(Self::create_checkpoint_stream(
self,
engine,
checkpoint_read_schema,
meta_predicate,
checkpoint_parts,
)?)
};

Ok(commit_stream.chain(checkpoint_stream))
}

fn create_checkpoint_stream(
&self,
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
checkpoint_parts: Vec<FileMeta>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// We checked that checkpoint_parts is not empty before calling this function
require!(
!checkpoint_parts.is_empty(),
Error::generic("Checkpoint parts should not be empty")
);
let is_json_checkpoint = checkpoint_parts[0].location.path().ends_with(".json");

let actions = Self::read_checkpoint_files(
engine,
checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
is_json_checkpoint,
)?;

let need_file_actions = checkpoint_read_schema.contains(ADD_NAME)
&& checkpoint_read_schema.contains(SIDECAR_NAME);

require!(
!(need_file_actions && !checkpoint_read_schema.contains(SIDECAR_NAME)),
Error::generic(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);

let is_multi_part_checkpoint = self.checkpoint_parts.len() > 1; //maybe not self?
let log_root = self.log_root.clone();
let parquet_handler = engine.get_parquet_handler().clone();

// If the schema does not contain add/remove actions (e.g., metadata-only replay),
// then we return the checkpoint batch directly.
// - Multi-part checkpoints never have sidecar actions, so they are returned as-is.
// - Otherwise, we check for sidecar references and replace checkpoint batches accordingly.
let checkpoint_stream = if need_file_actions && !is_multi_part_checkpoint {
Left(
actions
// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present in the batch
// - 1 or more sidecar batches referenced in the checkpoint batch by sidecar actions
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Right(
Self::process_checkpoint_batch(
parquet_handler.clone(),
log_root.clone(),
checkpoint_batch,
)
.map_or_else(|e| Left(std::iter::once(Err(e))), Right)
.map_ok(|batch| (batch, false)),
),
Err(e) => Left(std::iter::once(Err(e))),
}),
)
} else {
Right(actions.map_ok(|batch| (batch, false)))
};

Ok(checkpoint_stream)
}

fn process_checkpoint_batch(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: Box<dyn EngineData>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> {
let mut visitor = SidecarVisitor::default();

// Collect sidecars
visitor.visit_rows_of(batch.as_ref())?;

// If there are no sidecars, return the batch as is
if visitor.sidecars.is_empty() {
return Ok(Left(std::iter::once(Ok(batch))));
}

// Convert sidecar actions to sidecar file paths
let sidecar_files: Result<Vec<_>, _> = visitor
.sidecars
.iter()
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root))
.collect();

let sidecar_read_schema = get_log_add_schema().clone();

// If sidecars files exist, read the sidecar files and return the iterator of sidecar batches
// to replace the checkpoint batch in the top level iterator
Ok(Right(parquet_handler.read_parquet_files(
&sidecar_files?,
sidecar_read_schema,
None,
)?))
}

// Helper function to read checkpoint files based on the file type
fn read_checkpoint_files(
engine: &dyn Engine,
checkpoint_parts: Vec<FileMeta>,
schema: SchemaRef,
predicate: Option<ExpressionRef>,
is_json: bool,
) -> DeltaResult<FileDataReadResultIterator> {
if is_json {
engine
.get_json_handler()
.read_json_files(&checkpoint_parts, schema, predicate)
} else {
engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, schema, predicate)
}
}

// Helper function to convert a single sidecar action to a FileMeta
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> Result<FileMeta, Error> {
let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
Ok(FileMeta {
location,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
})
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
let data_batches = self.replay_for_metadata(engine)?;
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn build_log_with_paths_and_checkpoint(
}

#[test]
fn build_snapshot_with_unsupported_uuid_checkpoint() {
fn build_snapshot_with_uuid_checkpoint() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
Expand All @@ -129,10 +129,10 @@ fn build_snapshot_with_unsupported_uuid_checkpoint() {
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
assert_eq!(checkpoint_parts[0].version, 5);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
}

Expand Down
19 changes: 6 additions & 13 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints
matches!(
self.file_type,
LogPathFileType::SinglePartCheckpoint | LogPathFileType::MultiPartCheckpoint { .. }
LogPathFileType::SinglePartCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. }
| LogPathFileType::UuidCheckpoint(_)
)
}

Expand All @@ -175,10 +177,7 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
#[allow(dead_code)] // currently only used in tests, which don't "count"
fn is_unknown(&self) -> bool {
// TODO: Stop treating UuidCheckpoint as unknown once we support v2 checkpoints
matches!(
self.file_type,
LogPathFileType::Unknown | LogPathFileType::UuidCheckpoint(_)
)
matches!(self.file_type, LogPathFileType::Unknown)
}
}

Expand Down Expand Up @@ -357,10 +356,7 @@ mod tests {
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());

// TODO: Support v2 checkpoints! Until then we can't treat these as checkpoint files.
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
assert!(log_path.is_checkpoint());

let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
Expand All @@ -377,10 +373,7 @@ mod tests {
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());

// TODO: Support v2 checkpoints! Until then we can't treat these as checkpoint files.
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
assert!(log_path.is_checkpoint());

let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo")
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use url::Url;
use crate::actions::deletion_vector::{
deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor,
};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME};
use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar};
use crate::predicates::{DefaultPredicateEvaluator, EmptyColumnResolver};
use crate::scan::state::{DvInfo, Stats};
Expand Down Expand Up @@ -428,7 +428,7 @@ impl Scan {
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_add_schema().clone();
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME, SIDECAR_NAME])?;

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ impl StructType {
self.fields.values()
}

pub fn contains(&self, name: impl AsRef<str>) -> bool {
self.fields.contains_key(name.as_ref())
}

/// Extracts the name and type of all leaf columns, in schema order. Caller should pass Some
/// `own_name` if this schema is embedded in a larger struct (e.g. `add.*`) and None if the
/// schema is a top-level result (e.g. `*`).
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ mod test {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([WriterFeatures::V2Checkpoint]),
Some([ReaderFeatures::UnsupportedFeature]),
Some([WriterFeatures::UnsupportedFeature]),
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum ReaderFeatures {
/// vacuumProtocolCheck ReaderWriter feature ensures consistent application of reader and writer
/// protocol checks during VACUUM operations
VacuumProtocolCheck,
/// A dummy variant used to represent an unsupported feature for testing purposes
UnsupportedFeature,
}

/// Similar to reader features, writer features communicate capabilities that must be implemented
Expand Down Expand Up @@ -109,6 +111,8 @@ pub enum WriterFeatures {
/// vacuumProtocolCheck ReaderWriter feature ensures consistent application of reader and writer
/// protocol checks during VACUUM operations
VacuumProtocolCheck,
/// A dummy variant used to represent an unsupported feature for testing purposes
UnsupportedFeature,
}

impl From<ReaderFeatures> for String {
Expand All @@ -133,6 +137,7 @@ pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
ReaderFeatures::TypeWidening,
ReaderFeatures::TypeWideningPreview,
ReaderFeatures::VacuumProtocolCheck,
ReaderFeatures::V2Checkpoint,
])
});

Expand All @@ -154,6 +159,7 @@ mod tests {
(ReaderFeatures::TypeWideningPreview, "typeWidening-preview"),
(ReaderFeatures::V2Checkpoint, "v2Checkpoint"),
(ReaderFeatures::VacuumProtocolCheck, "vacuumProtocolCheck"),
(ReaderFeatures::UnsupportedFeature, "unsupportedFeature"),
];

assert_eq!(ReaderFeatures::VARIANTS.len(), cases.len());
Expand Down Expand Up @@ -192,6 +198,7 @@ mod tests {
(WriterFeatures::IcebergCompatV1, "icebergCompatV1"),
(WriterFeatures::IcebergCompatV2, "icebergCompatV2"),
(WriterFeatures::VacuumProtocolCheck, "vacuumProtocolCheck"),
(WriterFeatures::UnsupportedFeature, "unsupportedFeature"),
];

assert_eq!(WriterFeatures::VARIANTS.len(), cases.len());
Expand Down
5 changes: 2 additions & 3 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,8 @@ golden_test!("time-travel-start", latest_snapshot_test);
golden_test!("time-travel-start-start20", latest_snapshot_test);
golden_test!("time-travel-start-start20-start40", latest_snapshot_test);

skip_test!("v2-checkpoint-json": "v2 checkpoint not supported");
skip_test!("v2-checkpoint-parquet": "v2 checkpoint not supported");

golden_test!("v2-checkpoint-json", latest_snapshot_test);
golden_test!("v2-checkpoint-parquet", latest_snapshot_test);
// BUG:
// - AddFile: 'file:/some/unqualified/absolute/path'
// - RemoveFile: '/some/unqualified/absolute/path'
Expand Down

0 comments on commit 00af1f9

Please sign in to comment.