Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move log segment into separate module #438

Merged
merged 23 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ pub mod path;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod path;

#[cfg(feature = "developer-visibility")]
pub mod log_segment;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod log_segment;

pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
157 changes: 157 additions & 0 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
//! files.

use crate::{
actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME},
schema::SchemaRef,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta,
};
use itertools::Itertools;
use std::sync::{Arc, LazyLock};
use url::Url;

#[derive(Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct LogSegment {
pub log_root: Url,
/// Reverse order sorted commit files in the log segment
pub commit_files: Vec<FileMeta>,
/// checkpoint files in the log segment.
pub checkpoint_files: Vec<FileMeta>,
}

impl LogSegment {
/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
/// The boolean flags indicates whether the data was read from
/// a commit file (true) or a checkpoint file (false).
///
/// `read_schema` is the schema to read the log files with. This can be used
/// to project the log files to a subset of the columns.
///
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
/// query's predicate, but rather a predicate for filtering log files themselves.
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn replay(
&self,
engine: &dyn Engine,
commit_read_schema: SchemaRef,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_stream = engine
.get_json_handler()
.read_json_files(
&self.commit_files,
commit_read_schema,
meta_predicate.clone(),
)?
.map_ok(|batch| (batch, true));

let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(
&self.checkpoint_files,
checkpoint_read_schema,
meta_predicate,
)?
.map_ok(|batch| (batch, false));

Ok(commit_stream.chain(checkpoint_stream))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
let data_batches = self.replay_for_metadata(engine)?;
let (mut metadata_opt, mut protocol_opt) = (None, None);
for batch in data_batches {
let (batch, _) = batch?;
if metadata_opt.is_none() {
metadata_opt = Metadata::try_new_from_data(batch.as_ref())?;
}
if protocol_opt.is_none() {
protocol_opt = Protocol::try_new_from_data(batch.as_ref())?;
}
if metadata_opt.is_some() && protocol_opt.is_some() {
// we've found both, we can stop
break;
}
}
match (metadata_opt, protocol_opt) {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
(None, None) => Err(Error::MissingMetadataAndProtocol),
}
}

// Replay the commit log, projecting rows to only contain Protocol and Metadata action columns.
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| {
Some(Arc::new(Expression::or(
Expression::column([METADATA_NAME, "id"]).is_not_null(),
Expression::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(),
)))
});
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use itertools::Itertools;

use crate::{engine::sync::SyncEngine, Table};

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
// that the parquet reader properly infers nullcount = rowcount for missing columns. The two
// checkpoint part files that contain transaction app ids have truncated schemas that would
// otherwise fail skipping due to their missing nullcount stat:
//
// Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B
// --------------------------------------------------------------------------------
// type nulls min / max
// txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..."
// txn.version INT64 0 "4390" / "4390"
#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();

// The checkpoint has five parts, each containing one action:
// 1. txn (physically missing P&M columns)
// 2. metaData
// 3. protocol
// 4. add
// 5. txn (physically missing P&M columns)
//
// The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata`
// always skips parts 4 and 5 because it terminates the iteration after finding both P&M.
//
// NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group.
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently
// read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for
// missing columns, but can still skip part 3 because has valid nullcount stats for P&M.
assert_eq!(data.len(), 4);
}
}
66 changes: 31 additions & 35 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,42 +472,38 @@ fn transform_to_logical_internal(
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let read_schema = global_state.read_schema.clone();
if have_partition_cols || global_state.column_mapping_mode != ColumnMappingMode::None {
// need to add back partition cols and/or fix-up mapped columns
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = global_state
.logical_schema
.fields
.get_index(*field_idx)
.ok_or_else(|| {
Error::generic("logical schema did not contain expected field, can't transform data")
})?.1;
let name = field.physical_name(global_state.column_mapping_mode)?;
let value_expression = parse_partition_value(
partition_values.get(name),
field.data_type(),
)?;
Ok::<Expression, Error>(value_expression.into())
}
ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()),
})
.try_collect()?;
let read_expression = Expression::Struct(all_fields);
let result = engine
.get_expression_handler()
.get_evaluator(
read_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
.evaluate(data.as_ref())?;
Ok(result)
} else {
Ok(data)
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None {
return Ok(data);
}
// need to add back partition cols and/or fix-up mapped columns
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = global_state.logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name(global_state.column_mapping_mode)?;
let value_expression =
parse_partition_value(partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()),
})
.try_collect()?;
let read_expression = Expression::Struct(all_fields);
let result = engine
.get_expression_handler()
.get_evaluator(
read_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
.evaluate(data.as_ref())?;
Ok(result)
}

// some utils that are used in file_stream.rs and state.rs tests
Expand Down
Loading
Loading