Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract & insert sidecar batches in replay's action iterator #679

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a7b96a0
mvp, todo: tests
sebastiantia Feb 6, 2025
d55768e
refactor
sebastiantia Feb 6, 2025
c880d1f
mvp for tests
sebastiantia Feb 7, 2025
68a5139
note
sebastiantia Feb 7, 2025
ce2ee83
return result from tests to use shortcut ?
sebastiantia Feb 7, 2025
ec44a01
remove snapshot creation with v2checkpoints
sebastiantia Feb 7, 2025
029b2f2
refactors
sebastiantia Feb 7, 2025
3e4ea17
leverage flatten_ok()
sebastiantia Feb 8, 2025
89e02fc
address feedback
sebastiantia Feb 10, 2025
2ff3eb1
assertions for error refactor
sebastiantia Feb 10, 2025
91a187e
remove redundant type conversions
sebastiantia Feb 10, 2025
0b57452
refactor
sebastiantia Feb 10, 2025
433a4bb
remove redundant .into_iter
sebastiantia Feb 10, 2025
2122f59
handle errors from windows os
sebastiantia Feb 10, 2025
98cba07
remove unnecessary empty path check
sebastiantia Feb 10, 2025
51a34a8
typo
sebastiantia Feb 10, 2025
fe22868
nits
sebastiantia Feb 10, 2025
1e8ed59
infer type
sebastiantia Feb 10, 2025
f6370ef
review & nits
sebastiantia Feb 10, 2025
1133914
remove test iterator
sebastiantia Feb 10, 2025
b60ba43
review
sebastiantia Feb 11, 2025
eb0f1bb
clippy
sebastiantia Feb 11, 2025
df874bb
link issue
sebastiantia Feb 11, 2025
5ea49d7
nits
sebastiantia Feb 12, 2025
306e4ea
nits
sebastiantia Feb 12, 2025
9a67e06
test review
sebastiantia Feb 12, 2025
59936cf
nits
sebastiantia Feb 12, 2025
fc180a4
remove debug statements
sebastiantia Feb 13, 2025
ce422f7
review
sebastiantia Feb 19, 2025
21893d6
comments & review
sebastiantia Feb 19, 2025
6e64916
typo
sebastiantia Feb 19, 2025
626f7b4
typo
sebastiantia Feb 19, 2025
ff67fcf
review
sebastiantia Feb 21, 2025
fbe1d87
fix arrow imports
sebastiantia Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ pub struct SetTransaction {
/// file actions. This action is only allowed in checkpoints following the V2 spec.
///
/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Schema, Debug, PartialEq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct Sidecar {
Expand Down
5 changes: 2 additions & 3 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl RowVisitor for CdcVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Add action
// Since path column is required, use it to detect presence of a Cdc action
if let Some(path) = getters[0].get_opt(i, "cdc.path")? {
self.cdcs.push(Self::visit_cdc(i, path, getters)?);
}
Expand Down Expand Up @@ -438,7 +438,6 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct SidecarVisitor {
Expand Down Expand Up @@ -475,7 +474,7 @@ impl RowVisitor for SidecarVisitor {
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
// Since path column is required, use it to detect presence of a Sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
Expand Down
152 changes: 144 additions & 8 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
//! files.

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME,
PROTOCOL_NAME, REMOVE_NAME, SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient,
ParquetHandler, RowVisitor, Version,
};
use itertools::Itertools;
use std::collections::HashMap;
Expand Down Expand Up @@ -213,17 +218,148 @@ impl LogSegment {
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_parts: Vec<_> = self
let checkpoint_stream =
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;

Ok(commit_stream.chain(checkpoint_stream))
}

/// Returns an iterator over checkpoint data, processing sidecar files when necessary.
///
/// By default, `create_checkpoint_stream` checks for the presence of sidecar files, and
/// reads their contents if present. Checking for sidecar files is skipped if:
/// - The checkpoint is a multi-part checkpoint
/// - The checkpoint read schema does not contain the add action
///
/// For single-part checkpoints, any referenced sidecar files are processed. These
/// sidecar files contain the actual add actions that would otherwise be
/// stored directly in the checkpoint. The sidecar file batches are chained to the
/// checkpoint batch in the top level iterator to be returned.
fn create_checkpoint_stream(
&self,
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let need_file_actions = checkpoint_read_schema.contains(ADD_NAME)
| checkpoint_read_schema.contains(REMOVE_NAME);
require!(
!need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
Error::invalid_checkpoint(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);

let checkpoint_file_meta: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));

Ok(commit_stream.chain(checkpoint_stream))
let parquet_handler = engine.get_parquet_handler();

// Historically, we had a shared file reader trait for JSON and Parquet handlers,
// but it was removed to avoid unnecessary coupling. This is a concrete case
// where it *could* have been useful, but for now, we're keeping them separate.
// If similar patterns start appearing elsewhere, we should reconsider that decision.
let actions = if self
.checkpoint_parts
.first()
.is_some_and(|p| p.extension == "json")
{
engine.get_json_handler().read_json_files(
&checkpoint_file_meta,
checkpoint_read_schema,
meta_predicate,
)?
} else {
parquet_handler.read_parquet_files(
&checkpoint_file_meta,
checkpoint_read_schema,
meta_predicate,
)?
};
let log_root = self.log_root.clone();

let actions_iter = actions
.map(move |checkpoint_batch_result| -> DeltaResult<_> {
let checkpoint_batch = checkpoint_batch_result?;
// This closure maps the checkpoint batch to an iterator of batches
// by chaining the checkpoint batch with sidecar batches if they exist.

// 1. In the case where the schema does not contain the add action, we return the checkpoint
// batch directly as sidecar files only have to be read when the schema contains the add action.
// 2. Multi-part checkpoint batches never have sidecar actions, so the batch is returned as-is.
let sidecar_content = if need_file_actions && checkpoint_file_meta.len() == 1 {
Self::process_sidecars(
parquet_handler.clone(), // cheap Arc clone
log_root.clone(),
checkpoint_batch.as_ref(),
)?
} else {
None
};

let combined_batches = std::iter::once(Ok(checkpoint_batch))
.chain(sidecar_content.into_iter().flatten())
// The boolean flag indicates whether the batch originated from a commit file (true) or a checkpoint file (false).
.map_ok(|sidecar_batch| (sidecar_batch, false));

Ok(combined_batches)
})
.flatten_ok()
.map(|result| result?); // result-result to result

Ok(actions_iter)
}

/// Processes sidecar files for the given checkpoint batch.
///
/// This function extracts any sidecar file references from the provided batch.
/// Each sidecar file is read and an iterator of sidecar file batches is returned
fn process_sidecars(
parquet_handler: Arc<dyn ParquetHandler>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc, we want to avoid passing handlers around. Only reference to the engine. I think it's because we want to make it clear that the handler is tied to the engine and not to encourage holding an Arc ref to the handler.

cc @zachschuermann to double check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall running into lifetime issues when passing the entire engine. I believe we would have to explicitly tie the iterator's lifetime to that of the engine?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also pass an Arc.

Basically the iterator needs to hold a reference for the entire duration it's lazily evaluating. So you want to give it a reference it can hold for a long time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this change extends all the way to changing the scan_data function signature to explicitly tie the engines lifetime to the iterator.

pub fn scan_data<'a>(
        &self,
        engine: &'a dyn Engine,
    ) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>> + 'a> {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, so the basic issue is that we have delayed reading of parquet files, so at some point we want an item off the iterator, and to produce it, we need to read some parquet, so we need a handler. Previously we could do all the read calls up front and then just map off that iterator, so we didn't need an engine ref plumbed through.

I think if this is all internal, i.e., we don't want to expose any of these function signatures to engines (especially in the FFI), then cloning the Arcs is fine (it's very cheap. as a suggestion we usually put // cheap arc clone at those clone sites to make it clear).

If we do want to ever expose this, we'll need to think more, but afaict, we don't.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the look @nicklan, just to confirm we will go ahead and clone the parquet handler

log_root: Url,
batch: &dyn EngineData,
) -> DeltaResult<Option<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
// Visit the rows of the checkpoint batch to extract sidecar file references
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(batch)?;

// If there are no sidecar files, return early
if visitor.sidecars.is_empty() {
return Ok(None);
}

let sidecar_files: Vec<_> = visitor
.sidecars
.iter()
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if sidecar_to_file_meta could be a closure. We only use this once.

let sidecar_to_filemeta = |sidecar| {
        let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
        Ok(FileMeta {
            location,
            last_modified: sidecar.modification_time,
            size: sidecar.size_in_bytes as usize,
        })
}

And then map sidecar

    visitor
            .sidecars
            .iter()
            .map(sidecar_to_filemeta)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give it a shot and see how it is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's a good idea to leave it as a separate function for unit testing purposes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say either keep the separate function (if needed for testing) or embed the logic directly in the map call? What purpose does a separately named closure serve?

(aside: not sure if cargo fmt will like my indentation choice above -- depends on whether the ( or { is more important)

.try_collect()?;

// Read the sidecar files and return an iterator of sidecar file batches
Ok(Some(parquet_handler.read_parquet_files(
&sidecar_files,
get_log_add_schema().clone(),
None,
)?))
}

/// Convert a Sidecar record to a FileMeta.
///
/// This helper first builds the URL by joining the provided log_root with
/// the "_sidecars/" folder and the given sidecar path.
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> DeltaResult<FileMeta> {
Ok(FileMeta {
location: log_root.join("_sidecars/")?.join(&sidecar.path)?,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes.try_into().map_err(|_| {
Error::generic(format!(
"Failed to convert sidecar size {} to usize",
sidecar.size_in_bytes
))
})?,
})
}

// Get the most up-to-date Protocol and Metadata actions
Expand Down
Loading
Loading