Skip to content

Commit

Permalink
shorten tests, check error cases
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Dec 8, 2024
1 parent e11e654 commit 027a73a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 30 deletions.
8 changes: 8 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ pub struct DvInfo {
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
}

impl From<DeletionVectorDescriptor> for DvInfo {
fn from(deletion_vector: DeletionVectorDescriptor) -> Self {
DvInfo {
deletion_vector: Some(deletion_vector),
}
}
}

/// Give engines an easy way to consume stats
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
185 changes: 155 additions & 30 deletions kernel/src/table_changes/data_read.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::ops::Not;

use url::Url;

use super::scan_file::CdfScanFileType;
use crate::actions::deletion_vector::{deletion_treemap_to_bools, selection_treemap_to_bools};
use crate::table_changes::scan_file::CdfScanFile;
use crate::{DeltaResult, Engine, Error};

#[allow(unused)]
struct ResolvedCdfScanFile {
scan_file: CdfScanFile,
selection_vector: Vec<bool>,
Expand All @@ -28,7 +27,7 @@ struct ResolvedCdfScanFile {
/// have the deletion vector read (if present), and each is converted into a [`ResolvedCdfScanFile`].
/// No changes are made to the `scan_type`.
#[allow(unused)]
pub(crate) fn resolve_scan_file_dv(
fn resolve_scan_file_dv(
engine: &dyn Engine,
table_root: &Url,
scan_file: CdfScanFile,
Expand All @@ -51,7 +50,7 @@ pub(crate) fn resolve_scan_file_dv(
}
(_, Some(_), CdfScanFileType::Cdc) => {
return Err(Error::generic(
"CdfScanFile with type cdccannot have a remove deletion vector",
"CdfScanFile with type cdc cannot have a remove deletion vector",
));
}
(add_dv, Some(rm_dv), CdfScanFileType::Add) => {
Expand Down Expand Up @@ -94,52 +93,73 @@ pub(crate) fn resolve_scan_file_dv(
scan_file: rm_scan_file,
selection_vector,
});
Ok([adds, removes].into_iter().flatten())
Ok([removes, adds].into_iter().flatten())
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, path::PathBuf};
use std::{collections::HashMap, io::Write, path::PathBuf};

use bytes::BufMut;
use itertools::Itertools;
use roaring::RoaringTreemap;

use crate::{
actions::deletion_vector::DeletionVectorDescriptor,
engine::sync::SyncEngine,
scan::state::DvInfo,
table_changes::scan_file::{CdfScanFile, CdfScanFileType},
Error,
};

use super::resolve_scan_file_dv;

fn generate_dv(map: RoaringTreemap) -> DeletionVectorDescriptor {
let buf = Vec::new();
let mut writer = buf.writer();
let magic: u32 = 1681511377;
writer.write_all(&magic.to_le_bytes()).unwrap();
map.serialize_into(&mut writer).unwrap();
let buf = writer.into_inner();
let inline_dv = z85::encode(&buf);
DeletionVectorDescriptor {
storage_type: "i".into(),
path_or_inline_dv: inline_dv,
offset: None,
size_in_bytes: buf.len().try_into().unwrap(),
cardinality: map.len().try_into().unwrap(),
}
}

fn get_add_scan_file(dv_info: DvInfo, remove_dv: Option<DvInfo>) -> CdfScanFile {
CdfScanFile {
scan_type: CdfScanFileType::Add,
path: "fake_path".to_string(),
dv_info,
remove_dv,
partition_values: HashMap::new(),
commit_version: 42,
commit_timestamp: 1234,
}
}

#[test]
fn add_with_dv() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();

let commit_version = 42_i64;
let commit_timestamp = 1234_i64;
let deletion_vector = Some(DeletionVectorDescriptor {
storage_type: "u".to_string(),
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
});
let path = "fake_path".to_string();
let dv_info = DvInfo { deletion_vector };
let remove_dv = Some(Default::default());
let scan_file = CdfScanFile {
scan_type: CdfScanFileType::Add,
path: path.clone(),
dv_info,
remove_dv,
partition_values: HashMap::new(),
commit_version,
commit_timestamp,
};
let scan_file = get_add_scan_file(dv_info, remove_dv);

// Remove: None deleted
// Add: DV with 0th and 9th bit set (ie deleted)
Expand Down Expand Up @@ -167,21 +187,10 @@ mod tests {
size_in_bytes: 36,
cardinality: 2,
});
let commit_version = 42_i64;
let commit_timestamp = 1234_i64;

let path = "fake_path".to_string();
let dv_info = Default::default();
let remove_dv = Some(DvInfo { deletion_vector });
let scan_file = CdfScanFile {
scan_type: CdfScanFileType::Add,
path: path.clone(),
dv_info,
remove_dv,
partition_values: HashMap::new(),
commit_version,
commit_timestamp,
};
let scan_file = get_add_scan_file(dv_info, remove_dv);

// Remove: DV with 0th and 9th bit set (ie deleted)
// Add: No rows deleted
Expand All @@ -194,4 +203,120 @@ mod tests {
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Add, expected_sv)]);
}

#[test]
fn restore_subset() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();

let rm_dv = generate_dv(RoaringTreemap::from([0, 1, 4, 5]));
let add_dv = generate_dv(RoaringTreemap::from([0, 5]));

let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_add_scan_file(dv_info, remove_dv);

let mut expected_sv = vec![false; 5];
expected_sv[1] = true;
expected_sv[4] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Add, expected_sv)]);
}
#[test]
fn delete_subset() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();

let rm_dv = generate_dv(RoaringTreemap::from([0, 5]));
let add_dv = generate_dv(RoaringTreemap::from([0, 1, 4, 5]));

let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_add_scan_file(dv_info, remove_dv);

let mut expected_sv = vec![false; 5];
expected_sv[1] = true;
expected_sv[4] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Remove, expected_sv)]);
}

#[test]
fn adds_and_removes() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();

let rm_dv = generate_dv(RoaringTreemap::from([0, 2]));
let add_dv = generate_dv(RoaringTreemap::from([0, 1]));

let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_add_scan_file(dv_info, remove_dv);

let mut rm_sv = vec![false; 2];
rm_sv[1] = true;
let mut add_sv = vec![false; 3];
add_sv[2] = true;

let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(
resolved,
vec![
(CdfScanFileType::Remove, rm_sv),
(CdfScanFileType::Add, add_sv)
]
);
}

#[test]
fn cdc_with_remove_dv_fails() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();

let rm_dv = generate_dv(RoaringTreemap::from([0, 2]));

let remove_dv = Some(DvInfo::from(rm_dv));
let mut scan_file = CdfScanFile {
scan_type: CdfScanFileType::Cdc,
path: "fake_path".to_string(),
dv_info: Default::default(),
remove_dv,
partition_values: HashMap::new(),
commit_version: 42,
commit_timestamp: 1234,
};

let expected_err =
Error::generic("CdfScanFile with type cdc cannot have a remove deletion vector");

let res = resolve_scan_file_dv(&engine, &table_root, scan_file.clone())
.err()
.unwrap();
assert_eq!(res.to_string(), expected_err.to_string());

scan_file.scan_type = CdfScanFileType::Remove;
let expected_err =
Error::generic("CdfScanFile with type remove cannot have a remove deletion vector");
let res = resolve_scan_file_dv(&engine, &table_root, scan_file)
.err()
.unwrap();
assert_eq!(res.to_string(), expected_err.to_string());
}
}

0 comments on commit 027a73a

Please sign in to comment.