diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 9bd7ec156..aa5fd5e9c 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -34,6 +34,14 @@ pub struct DvInfo { pub(crate) deletion_vector: Option, } +impl From for DvInfo { + fn from(deletion_vector: DeletionVectorDescriptor) -> Self { + DvInfo { + deletion_vector: Some(deletion_vector), + } + } +} + /// Give engines an easy way to consume stats #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/kernel/src/table_changes/data_read.rs b/kernel/src/table_changes/data_read.rs index ad1aba35d..bd59806f3 100644 --- a/kernel/src/table_changes/data_read.rs +++ b/kernel/src/table_changes/data_read.rs @@ -1,5 +1,3 @@ -use std::ops::Not; - use url::Url; use super::scan_file::CdfScanFileType; @@ -7,6 +5,7 @@ use crate::actions::deletion_vector::{deletion_treemap_to_bools, selection_treem use crate::table_changes::scan_file::CdfScanFile; use crate::{DeltaResult, Engine, Error}; +#[allow(unused)] struct ResolvedCdfScanFile { scan_file: CdfScanFile, selection_vector: Vec, @@ -28,7 +27,7 @@ struct ResolvedCdfScanFile { /// have the deletion vector read (if present), and each is converted into a [`ResolvedCdfScanFile`]. /// No changes are made to the `scan_type`. #[allow(unused)] -pub(crate) fn resolve_scan_file_dv( +fn resolve_scan_file_dv( engine: &dyn Engine, table_root: &Url, scan_file: CdfScanFile, @@ -51,7 +50,7 @@ pub(crate) fn resolve_scan_file_dv( } (_, Some(_), CdfScanFileType::Cdc) => { return Err(Error::generic( - "CdfScanFile with type cdccannot have a remove deletion vector", + "CdfScanFile with type cdc cannot have a remove deletion vector", )); } (add_dv, Some(rm_dv), CdfScanFileType::Add) => { @@ -94,24 +93,56 @@ pub(crate) fn resolve_scan_file_dv( scan_file: rm_scan_file, selection_vector, }); - Ok([adds, removes].into_iter().flatten()) + Ok([removes, adds].into_iter().flatten()) } #[cfg(test)] mod tests { - use std::{collections::HashMap, path::PathBuf}; + use std::{collections::HashMap, io::Write, path::PathBuf}; + use bytes::BufMut; use itertools::Itertools; + use roaring::RoaringTreemap; use crate::{ actions::deletion_vector::DeletionVectorDescriptor, engine::sync::SyncEngine, scan::state::DvInfo, table_changes::scan_file::{CdfScanFile, CdfScanFileType}, + Error, }; use super::resolve_scan_file_dv; + fn generate_dv(map: RoaringTreemap) -> DeletionVectorDescriptor { + let buf = Vec::new(); + let mut writer = buf.writer(); + let magic: u32 = 1681511377; + writer.write_all(&magic.to_le_bytes()).unwrap(); + map.serialize_into(&mut writer).unwrap(); + let buf = writer.into_inner(); + let inline_dv = z85::encode(&buf); + DeletionVectorDescriptor { + storage_type: "i".into(), + path_or_inline_dv: inline_dv, + offset: None, + size_in_bytes: buf.len().try_into().unwrap(), + cardinality: map.len().try_into().unwrap(), + } + } + + fn get_add_scan_file(dv_info: DvInfo, remove_dv: Option) -> CdfScanFile { + CdfScanFile { + scan_type: CdfScanFileType::Add, + path: "fake_path".to_string(), + dv_info, + remove_dv, + partition_values: HashMap::new(), + commit_version: 42, + commit_timestamp: 1234, + } + } + #[test] fn add_with_dv() { let engine = SyncEngine::new(); @@ -119,8 +150,6 @@ mod tests { std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let table_root = url::Url::from_directory_path(path).unwrap(); - let commit_version = 42_i64; - let commit_timestamp = 1234_i64; let deletion_vector = Some(DeletionVectorDescriptor { storage_type: "u".to_string(), path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(), @@ -128,18 +157,9 @@ mod tests { size_in_bytes: 36, cardinality: 2, }); - let path = "fake_path".to_string(); let dv_info = DvInfo { deletion_vector }; let remove_dv = Some(Default::default()); - let scan_file = CdfScanFile { - scan_type: CdfScanFileType::Add, - path: path.clone(), - dv_info, - remove_dv, - partition_values: HashMap::new(), - commit_version, - commit_timestamp, - }; + let scan_file = get_add_scan_file(dv_info, remove_dv); // Remove: None deleted // Add: DV with 0th and 9th bit set (ie deleted) @@ -167,21 +187,10 @@ mod tests { size_in_bytes: 36, cardinality: 2, }); - let commit_version = 42_i64; - let commit_timestamp = 1234_i64; - let path = "fake_path".to_string(); let dv_info = Default::default(); let remove_dv = Some(DvInfo { deletion_vector }); - let scan_file = CdfScanFile { - scan_type: CdfScanFileType::Add, - path: path.clone(), - dv_info, - remove_dv, - partition_values: HashMap::new(), - commit_version, - commit_timestamp, - }; + let scan_file = get_add_scan_file(dv_info, remove_dv); // Remove: DV with 0th and 9th bit set (ie deleted) // Add: No rows deleted @@ -194,4 +203,120 @@ mod tests { .collect_vec(); assert_eq!(resolved, vec![(CdfScanFileType::Add, expected_sv)]); } + + #[test] + fn restore_subset() { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let table_root = url::Url::from_directory_path(path).unwrap(); + + let rm_dv = generate_dv(RoaringTreemap::from([0, 1, 4, 5])); + let add_dv = generate_dv(RoaringTreemap::from([0, 5])); + + let dv_info = DvInfo::from(add_dv); + let remove_dv = Some(DvInfo::from(rm_dv)); + let scan_file = get_add_scan_file(dv_info, remove_dv); + + let mut expected_sv = vec![false; 5]; + expected_sv[1] = true; + expected_sv[4] = true; + let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file) + .unwrap() + .map(|file| (file.scan_file.scan_type, file.selection_vector)) + .collect_vec(); + assert_eq!(resolved, vec![(CdfScanFileType::Add, expected_sv)]); + } + #[test] + fn delete_subset() { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let table_root = url::Url::from_directory_path(path).unwrap(); + + let rm_dv = generate_dv(RoaringTreemap::from([0, 5])); + let add_dv = generate_dv(RoaringTreemap::from([0, 1, 4, 5])); + + let dv_info = DvInfo::from(add_dv); + let remove_dv = Some(DvInfo::from(rm_dv)); + let scan_file = get_add_scan_file(dv_info, remove_dv); + + let mut expected_sv = vec![false; 5]; + expected_sv[1] = true; + expected_sv[4] = true; + let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file) + .unwrap() + .map(|file| (file.scan_file.scan_type, file.selection_vector)) + .collect_vec(); + assert_eq!(resolved, vec![(CdfScanFileType::Remove, expected_sv)]); + } + + #[test] + fn adds_and_removes() { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let table_root = url::Url::from_directory_path(path).unwrap(); + + let rm_dv = generate_dv(RoaringTreemap::from([0, 2])); + let add_dv = generate_dv(RoaringTreemap::from([0, 1])); + + let dv_info = DvInfo::from(add_dv); + let remove_dv = Some(DvInfo::from(rm_dv)); + let scan_file = get_add_scan_file(dv_info, remove_dv); + + let mut rm_sv = vec![false; 2]; + rm_sv[1] = true; + let mut add_sv = vec![false; 3]; + add_sv[2] = true; + + let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file) + .unwrap() + .map(|file| (file.scan_file.scan_type, file.selection_vector)) + .collect_vec(); + assert_eq!( + resolved, + vec![ + (CdfScanFileType::Remove, rm_sv), + (CdfScanFileType::Add, add_sv) + ] + ); + } + + #[test] + fn cdc_with_remove_dv_fails() { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let table_root = url::Url::from_directory_path(path).unwrap(); + + let rm_dv = generate_dv(RoaringTreemap::from([0, 2])); + + let remove_dv = Some(DvInfo::from(rm_dv)); + let mut scan_file = CdfScanFile { + scan_type: CdfScanFileType::Cdc, + path: "fake_path".to_string(), + dv_info: Default::default(), + remove_dv, + partition_values: HashMap::new(), + commit_version: 42, + commit_timestamp: 1234, + }; + + let expected_err = + Error::generic("CdfScanFile with type cdc cannot have a remove deletion vector"); + + let res = resolve_scan_file_dv(&engine, &table_root, scan_file.clone()) + .err() + .unwrap(); + assert_eq!(res.to_string(), expected_err.to_string()); + + scan_file.scan_type = CdfScanFileType::Remove; + let expected_err = + Error::generic("CdfScanFile with type remove cannot have a remove deletion vector"); + let res = resolve_scan_file_dv(&engine, &table_root, scan_file) + .err() + .unwrap(); + assert_eq!(res.to_string(), expected_err.to_string()); + } }