Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve deletion vectors to find inserted and removed rows for CDF #568

Merged
merged 34 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bd9585d
scan file implementation for cdf
OussamaSaoudi-db Nov 25, 2024
e2c2d90
change naming and update documentation for cdf scan files
OussamaSaoudi-db Dec 6, 2024
3db127f
fixup some naming
OussamaSaoudi-db Dec 6, 2024
ec6e629
inline dvinfo creation
OussamaSaoudi-db Dec 7, 2024
74eed10
Change name of expression transform
OussamaSaoudi-db Dec 7, 2024
9fe16db
Add test for null parttion columns
OussamaSaoudi-db Dec 7, 2024
8635e38
Improve testing for scan_file
OussamaSaoudi-db Dec 8, 2024
a17f4b0
Change visitor, remove (un)resolved cdf scan file
OussamaSaoudi-db Dec 8, 2024
5fab00c
Only keep track of remove dv instead of hashmap
OussamaSaoudi-db Dec 8, 2024
21553a4
Fix remove dv
OussamaSaoudi-db Dec 8, 2024
08867f8
patch rm_dv
OussamaSaoudi-db Dec 8, 2024
1228107
Update comment for CdfScanFileVisitor
OussamaSaoudi-db Dec 8, 2024
97a5790
Initial cdf read phase with deletion vector resolution
OussamaSaoudi-db Dec 6, 2024
4969e44
lazily construct empty treemaps
OussamaSaoudi-db Dec 6, 2024
dcb17fa
Change treemap handling in kernel, use selection vector, and simplify…
OussamaSaoudi-db Dec 6, 2024
ebaf225
Rebase onto scan file
OussamaSaoudi-db Dec 6, 2024
bd49142
update doc comment for dvs
OussamaSaoudi-db Dec 6, 2024
6287e6e
fix comment
OussamaSaoudi-db Dec 7, 2024
7fe4f4d
prototype of add/rm dv in cdfscanfile
OussamaSaoudi-db Dec 8, 2024
d4f95d5
Use default for dv info
OussamaSaoudi-db Dec 8, 2024
8a8f6bf
shorten tests, check error cases
OussamaSaoudi-db Dec 8, 2024
eeaabb0
Rename to physical_to_logical, add comment explaining the deletion/se…
OussamaSaoudi-db Dec 8, 2024
fa13ade
Add note about ordinary scans for dv resolution
OussamaSaoudi-db Dec 8, 2024
4dabdaf
Add doc comment to treemap_to_bools_with
OussamaSaoudi-db Dec 8, 2024
7fcb531
Update resolve_scan_file_dv docs
OussamaSaoudi-db Dec 8, 2024
577b424
Add test and docs
OussamaSaoudi-db Dec 8, 2024
a970df1
Add documentation
OussamaSaoudi-db Dec 8, 2024
5b70aab
Rename to resolve-dvs
OussamaSaoudi-db Dec 8, 2024
3e0ead6
Fixup naming and docs
OussamaSaoudi-db Dec 9, 2024
0b207c4
address pr comments
OussamaSaoudi-db Dec 9, 2024
68a4fb1
Merge branch 'main' into cdf_dv_resolve
OussamaSaoudi-db Dec 9, 2024
3259769
Add test comment, ignore sv to bools test
OussamaSaoudi-db Dec 10, 2024
2e9c29e
remove ignore from test
OussamaSaoudi-db Dec 10, 2024
5767bf1
remove comma
OussamaSaoudi-db Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,19 @@ fn slice_to_u32(buf: &[u8], endian: Endian) -> DeltaResult<u32> {

/// helper function to convert a treemap into a boolean vector where, for index i, if the bit is
/// set, the vector will be false, and otherwise at index i the vector will be true
pub(crate) fn treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
pub(crate) fn deletion_treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
treemap_to_bools_with(treemap, false)
}

/// helper function to convert a treemap into a boolean vector where, for index i, if the bit is
/// set, the vector will be true, and otherwise at index i the vector will be false
pub(crate) fn selection_treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
treemap_to_bools_with(treemap, true)
}

/// helper function to generate vectors of bools from treemap. If `set_bit` is `true`, this is
/// [`selection_treemap_to_bools`]. If `set_bit` is false, this is [`deletion_treemap_to_bools`]
fn treemap_to_bools_with(treemap: RoaringTreemap, set_bit: bool) -> Vec<bool> {
fn combine(high_bits: u32, low_bits: u32) -> usize {
((u64::from(high_bits) << 32) | u64::from(low_bits)) as usize
}
Expand All @@ -224,12 +236,12 @@ pub(crate) fn treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
Some(max) => {
// there are values in the map
//TODO(nick) panic if max is > MAX_USIZE
let mut result = vec![true; max as usize + 1];
let mut result = vec![!set_bit; max as usize + 1];
let bitmaps = treemap.bitmaps();
for (index, bitmap) in bitmaps {
for bit in bitmap.iter() {
let vec_index = combine(index, bit);
result[vec_index] = false;
result[vec_index] = set_bit;
}
}
result
Expand Down Expand Up @@ -380,7 +392,7 @@ mod tests {
}

// this test is ignored by default as it's expensive to allocate such big vecs full of `true`. you can run it via:
// cargo test actions::action_definitions::tests::test_dv_to_bools
// cargo test actions::deletion_vector::tests::test_dv_to_bools -- --ignored
#[test]
#[ignore]
fn test_dv_to_bools() {
Expand All @@ -391,7 +403,7 @@ mod tests {
rb.insert(30854);
rb.insert(4294967297);
rb.insert(4294967300);
let bools = super::treemap_to_bools(rb);
let bools = super::deletion_treemap_to_bools(rb);
let mut expected = vec![true; 4294967301];
expected[0] = false;
expected[2] = false;
Expand All @@ -402,6 +414,26 @@ mod tests {
assert_eq!(bools, expected);
}

#[test]
fn test_sv_to_bools() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how expensive is this to run? if cheap, remove #[ignore] from the one above? if expensive should we ignore this one too? or actually if this is relatively expensive but we decide to keep it, maybe we should integrate it with the test above? (just check both apis in the same test?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that it ran just fine on my machine, whereas the other one took a minute.

Giving an algorithmic complexity is hard to say tbh. I go into it in this comment, but the TLDR is that the zero-initialized large sv vector is fast to allocate. Just get a bunch of pages from the OS. The other one is slow because rust has to actually read the pages and set it to true.

I can disable it for consistency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh right i recall. how about we leave it enabled then and just leave that comment here in the code with it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is pretty staggering lol. 0.85s vs 40s on my machine!!!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea let's not keep the slow one. kinda nice we got some coverage through the other path now tho :)

let mut rb = RoaringTreemap::new();
rb.insert(0);
rb.insert(2);
rb.insert(7);
rb.insert(30854);
rb.insert(4294967297);
rb.insert(4294967300);
let bools = super::selection_treemap_to_bools(rb);
let mut expected = vec![false; 4294967301];
expected[0] = true;
expected[2] = true;
expected[7] = true;
expected[30854] = true;
expected[4294967297] = true;
expected[4294967300] = true;
assert_eq!(bools, expected);
}

#[test]
fn test_dv_row_indexes() {
let example = dv_inline();
Expand Down
6 changes: 4 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use itertools::Itertools;
use tracing::debug;
use url::Url;

use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::deletion_vector::{
deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor,
};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar};
use crate::scan::state::{DvInfo, Stats};
Expand Down Expand Up @@ -438,7 +440,7 @@ pub fn selection_vector(
) -> DeltaResult<Vec<bool>> {
let fs_client = engine.get_file_system_client();
let dv_treemap = descriptor.read(fs_client, table_root)?;
Ok(treemap_to_bools(dv_treemap))
Ok(deletion_treemap_to_bools(dv_treemap))
}

/// Transform the raw data read from parquet into the correct logical form, based on the provided
Expand Down
39 changes: 26 additions & 13 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
use std::collections::HashMap;
use std::sync::LazyLock;

use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::utils::require;
use crate::{
actions::{
deletion_vector::{treemap_to_bools, DeletionVectorDescriptor},
visitors::visit_deletion_vector_at,
},
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{GetData, RowVisitor, TypedGetData as _},
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef},
table_features::ColumnMappingMode,
DeltaResult, Engine, EngineData, Error,
};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use tracing::warn;

Expand All @@ -30,11 +29,18 @@ pub struct GlobalScanState {
}

/// this struct can be used by an engine to materialize a selection vector
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct DvInfo {
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
}

impl From<DeletionVectorDescriptor> for DvInfo {
fn from(deletion_vector: DeletionVectorDescriptor) -> Self {
let deletion_vector = Some(deletion_vector);
DvInfo { deletion_vector }
}
}

/// Give engines an easy way to consume stats
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -53,20 +59,27 @@ impl DvInfo {
self.deletion_vector.is_some()
}

pub fn get_selection_vector(
pub(crate) fn get_treemap(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<bool>>> {
let dv_treemap = self
.deletion_vector
) -> DeltaResult<Option<RoaringTreemap>> {
self.deletion_vector
.as_ref()
.map(|dv_descriptor| {
let fs_client = engine.get_file_system_client();
dv_descriptor.read(fs_client, table_root)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking: dv_descriptor.read takes care of slicing the DV when offset and size cover a subset of the DV file?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe it does. It just handles reading the on-disk file and turning it into a roaring treemap. Something at a higher level needs to slice it up if needed.

})
.transpose()?;
Ok(dv_treemap.map(treemap_to_bools))
.transpose()
}

pub fn get_selection_vector(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<bool>>> {
let dv_treemap = self.get_treemap(engine, table_root)?;
Ok(dv_treemap.map(deletion_treemap_to_bools))
}

/// Returns a vector of row indexes that should be *removed* from the result set
Expand Down Expand Up @@ -112,11 +125,11 @@ pub type ScanCallback<T> = fn(
/// ## Example
/// ```ignore
/// let mut context = [my context];
/// for res in scan_data { // scan data from scan.get_scan_data()
/// for res in scan_data { // scan data from scan.scan_data()
/// let (data, vector) = res?;
/// context = delta_kernel::scan::state::visit_scan_files(
/// data.as_ref(),
/// vector,
/// selection_vector,
/// context,
/// my_callback,
/// )?;
Expand Down
32 changes: 11 additions & 21 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ use crate::actions::{
PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_expr, column_name, ColumnName, Expression};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::scan_row_schema;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor};

use itertools::Itertools;

#[cfg(test)]
Expand All @@ -36,7 +37,7 @@ pub(crate) struct TableChangesScanData {
pub(crate) scan_data: Box<dyn EngineData>,
/// The selection vector used to filter the `scan_data`.
pub(crate) selection_vector: Vec<bool>,
/// An map from a remove action's path to its deletion vector
/// A map from a remove action's path to its deletion vector
pub(crate) remove_dvs: Arc<HashMap<String, DvInfo>>,
}

Expand Down Expand Up @@ -65,21 +66,6 @@ pub(crate) fn table_changes_action_iter(
Ok(result)
}

// Gets the expression for generating the engine data in [`TableChangesScanData`].
//
// TODO: This expression is temporary. In the future it will also select `cdc` and `remove` actions
// fields.
fn add_transform_expr() -> Expression {
Expression::Struct(vec![
column_expr!("add.path"),
column_expr!("add.size"),
column_expr!("add.modificationTime"),
column_expr!("add.stats"),
column_expr!("add.deletionVector"),
Expression::Struct(vec![column_expr!("add.partitionValues")]),
])
}

/// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`].
/// The scanner operates in two phases that _must_ be performed in the following order:
/// 1. Prepare phase [`LogReplayScanner::try_new`]: This iterates over every action in the commit.
Expand Down Expand Up @@ -237,7 +223,7 @@ impl LogReplayScanner {
remove_dvs,
commit_file,
// TODO: Add the timestamp as a column with an expression
timestamp: _,
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);

Expand All @@ -247,10 +233,14 @@ impl LogReplayScanner {
schema,
None,
)?;
let commit_version = commit_file
.version
.try_into()
.map_err(|_| Error::generic("Failed to convert commit version to i64"))?;
let evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
add_transform_expr(),
scan_row_schema().into(),
cdf_scan_row_expression(timestamp, commit_version),
cdf_scan_row_schema().into(),
);

let result = action_iter.map(move |actions| -> DeltaResult<_> {
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::utils::require;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
mod resolve_dvs;
pub mod scan;
mod scan_file;

static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
Expand Down
Loading
Loading