Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support the v2Checkpoint reader/writer feature #685

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1b7e188
mvp, todo: tests
sebastiantia Feb 6, 2025
cfd5f4f
refactor
sebastiantia Feb 6, 2025
6139ddd
mvp for tests
sebastiantia Feb 7, 2025
8a8c5fb
note
sebastiantia Feb 7, 2025
62dd034
return result from tests to use shortcut ?
sebastiantia Feb 7, 2025
c2ed82e
remove snapshot creation with v2checkpoints
sebastiantia Feb 7, 2025
d5ee495
refactors
sebastiantia Feb 7, 2025
f5bbf00
leverage flatten_ok()
sebastiantia Feb 8, 2025
65abd95
address feedback
sebastiantia Feb 10, 2025
1c101ea
assertions for error refactor
sebastiantia Feb 10, 2025
7cc3c81
remove redundant type conversions
sebastiantia Feb 10, 2025
fc403c2
refactor
sebastiantia Feb 10, 2025
4d4e601
remove redundant .into_iter
sebastiantia Feb 10, 2025
415c2f4
handle errors from windows os
sebastiantia Feb 10, 2025
2547c42
remove unnecessary empty path check
sebastiantia Feb 10, 2025
ea7349a
typo
sebastiantia Feb 10, 2025
39a1451
nits
sebastiantia Feb 10, 2025
ab9ef11
infer type
sebastiantia Feb 10, 2025
302efed
review & nits
sebastiantia Feb 10, 2025
06e5c92
remove test iterator
sebastiantia Feb 10, 2025
0e57bae
review
sebastiantia Feb 11, 2025
d49f835
clippy
sebastiantia Feb 11, 2025
510dc35
link issue
sebastiantia Feb 11, 2025
c49402c
nits
sebastiantia Feb 12, 2025
2fd8216
nits
sebastiantia Feb 12, 2025
f8defbe
test review
sebastiantia Feb 12, 2025
638387c
nits
sebastiantia Feb 12, 2025
4984655
remove debug statements
sebastiantia Feb 13, 2025
a77333c
review
sebastiantia Feb 19, 2025
b25bb58
comments & review
sebastiantia Feb 19, 2025
93971c5
typo
sebastiantia Feb 19, 2025
501c675
typo
sebastiantia Feb 19, 2025
0b9dc43
snapshot creation with v2checkpoints mvp
sebastiantia Feb 7, 2025
d92e141
file name change??
sebastiantia Feb 10, 2025
e5748e1
fix merge conflict errors
sebastiantia Feb 12, 2025
a035f8c
review & nits
sebastiantia Feb 19, 2025
3dcd085
merge error
sebastiantia Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,6 @@ pub struct SetTransaction {
/// file actions. This action is only allowed in checkpoints following the V2 spec.
///
/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Schema, Debug, PartialEq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct Sidecar {
Expand Down Expand Up @@ -783,15 +782,15 @@ mod tests {
}

#[test]
fn test_v2_checkpoint_unsupported() {
fn test_v2_checkpoint_supported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
4,
Expand Down Expand Up @@ -821,7 +820,7 @@ mod tests {
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
3,
Expand All @@ -839,7 +838,7 @@ mod tests {
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol {
min_reader_version: 1,
Expand Down
5 changes: 2 additions & 3 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl RowVisitor for CdcVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Add action
// Since path column is required, use it to detect presence of a Cdc action
if let Some(path) = getters[0].get_opt(i, "cdc.path")? {
self.cdcs.push(Self::visit_cdc(i, path, getters)?);
}
Expand Down Expand Up @@ -438,7 +438,6 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct SidecarVisitor {
Expand Down Expand Up @@ -475,7 +474,7 @@ impl RowVisitor for SidecarVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
// Since path column is required, use it to detect presence of a Sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
Expand Down
150 changes: 141 additions & 9 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
//! files.

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME,
PROTOCOL_NAME, SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient,
ParquetHandler, RowVisitor, Version,
};
use itertools::Itertools;
use std::collections::HashMap;
Expand Down Expand Up @@ -213,17 +218,144 @@ impl LogSegment {
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_parts: Vec<_> = self
.checkpoint_parts
let checkpoint_stream = Self::create_checkpoint_stream(
engine,
checkpoint_read_schema,
meta_predicate,
self.checkpoint_parts.clone(),
self.log_root.clone(),
)?;

Ok(commit_stream.chain(checkpoint_stream))
}

/// Returns an iterator over checkpoint data, processing sidecar files when necessary.
///
/// By default, `create_checkpoint_stream` checks for the presence of sidecar files, and
/// reads their contents if present. Checking for sidecar files is skipped if:
/// - The checkpoint is a multi-part checkpoint
/// - The checkpoint read schema does not contain the add action
///
/// For single-part checkpoints, any referenced sidecar files are processed. These
/// sidecar files contain the actual add actions that would otherwise be
/// stored directly in the checkpoint. The sidecar file batches are chained to the
/// checkpoint batch in the top level iterator to be returned.
fn create_checkpoint_stream(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think the *_stream naming is an artifact from the early days when this was infract a stream since then we moved to iterators. Maybe it makes sense to update function names accordingly?

engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let need_add_actions = checkpoint_read_schema.contains(ADD_NAME);
require!(
!need_add_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
Error::invalid_checkpoint(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);

let checkpoint_file_meta: Vec<FileMeta> = checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));

Ok(commit_stream.chain(checkpoint_stream))
let parquet_handler = engine.get_parquet_handler();

// Historically, we had a shared file reader trait for JSON and Parquet handlers,
// but it was removed to avoid unnecessary coupling. This is a concrete case
// where it *could* have been useful, but for now, we're keeping them separate.
// If similar patterns start appearing elsewhere, we should reconsider that decision.
let actions = if checkpoint_parts
.first()
.is_some_and(|p| p.extension == "json")
{
engine.get_json_handler().read_json_files(
&checkpoint_file_meta,
checkpoint_read_schema,
meta_predicate,
)?
} else {
parquet_handler.read_parquet_files(
&checkpoint_file_meta,
checkpoint_read_schema,
meta_predicate,
)?
};

let actions_iter = actions
.map(move |batch_result| -> DeltaResult<_> {
let checkpoint_batch = batch_result?;
// This closure maps the checkpoint batch to an iterator of batches
// by chaining the checkpoint batch with sidecar batches if they exist.

// 1. In the case where the schema does not contain the add action, we return the checkpoint
// batch directly as sidecar files only have to be read when the schema contains the add action.
// 2. Multi-part checkpoint batches never have sidecar actions, so the batch is returned as-is.
let sidecar_content = if need_add_actions && checkpoint_parts.len() == 1 {
Self::process_sidecars(
parquet_handler.clone(), // cheap Arc clone
log_root.clone(),
checkpoint_batch.as_ref(),
)?
} else {
None
};

let combined_batches = std::iter::once(Ok(checkpoint_batch))
.chain(sidecar_content.into_iter().flatten())
.map_ok(|sidecar_batch| (sidecar_batch, false));

Ok(combined_batches)
})
.flatten_ok()
.map(|result| result?); // result-result to result

Ok(actions_iter)
}

/// Processes sidecar files for the given checkpoint batch.
///
/// This function extracts any sidecar file references from the provided batch.
/// Each sidecar file is read and an iterator of sidecar file batches is returned
fn process_sidecars(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: &dyn EngineData,
) -> DeltaResult<Option<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
// Visit the rows of the checkpoint batch to extract sidecar file references
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(batch)?;

// If there are no sidecar files, return early
if visitor.sidecars.is_empty() {
return Ok(None);
}

let sidecar_files: Vec<_> = visitor
.sidecars
.iter()
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root))
.try_collect()?;

// Read the sidecar files and return an iterator of sidecar file batches
Ok(Some(parquet_handler.read_parquet_files(
&sidecar_files,
get_log_add_schema().clone(),
None,
)?))
}

/// Convert a Sidecar record to a FileMeta.
///
/// This helper first builds the URL by joining the provided log_root with
/// the "_sidecars/" folder and the given sidecar path.
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> DeltaResult<FileMeta> {
Ok(FileMeta {
location: log_root.join("_sidecars/")?.join(&sidecar.path)?,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
})
}

// Get the most up-to-date Protocol and Metadata actions
Expand Down
Loading
Loading