diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 2d26de785..0902bd2d0 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -290,6 +290,8 @@ fn read_scan_file( physical_to_logical_expr, global_state.logical_schema.clone().into(), ); + // Determine if the scan file was derived from a deletion vector pair + let is_dv_resolved_pair = scan_file.remove_dv.is_some(); let table_root = Url::parse(&global_state.table_root)?; let location = table_root.join(&scan_file.path)?; @@ -314,7 +316,31 @@ fn read_scan_file( // trying to return a captured variable. We're going to reassign `selection_vector` // to `rest` in a moment anyway let mut sv = selection_vector.take(); - let rest = split_vector(sv.as_mut(), len, None); + + // Gets the selection vector for a data batch with length `len`. There are three cases to + // consider: + // 1. A scan file derived from a deletion vector pair getting resolved. + // 2. A scan file that was not the result of a resolved pair, and has a deletion vector. + // 3. A scan file that was not the result of a resolved pair, and has no deletion vector. + // + // # Case 1 + // If the scan file is derived from a deletion vector pair, its selection vector should be + // extended with `false`. Consider a resolved selection vector `[0, 1]`. Only row 1 has + // changed. If there were more rows (for example 4 total), then none of them have changed. + // Hence, the selection vector is extended to become `[0, 1, 0, 0]`. + // + // # Case 2 + // If the scan file has a deletion vector but is unpaired, its selection vector should be + // extended with `true`. Consider a deletion vector with row 1 deleted. This generates a + // selection vector `[1, 0, 1]`. Only row 1 is deleted. Rows 0 and 2 are selected. If there + // are more rows (for example 4), then all the extra rows should be selected. The selection + // vector becomes `[1, 0, 1, 1]`. + // + // # Case 3 + // These scan files are either simple adds, removes, or cdc files. This case is a noop because + // the selection vector is `None`. + let extend = Some(!is_dv_resolved_pair); + let rest = split_vector(sv.as_mut(), len, extend); let result = ScanResult { raw_data: logical, raw_mask: sv, diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index c93b22d24..5263df960 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -54,6 +54,14 @@ fn read_cdf_for_table( #[test] fn cdf_with_deletion_vector() -> Result<(), Box> { let batches = read_cdf_for_table("cdf-table-with-dv", 0, None)?; + // Each commit performs the following: + // 0. Insert 0..=9 + // 1. Remove [0, 9] + // 2. Restore [0, 9] + // 3. Remove [0, 1, 4, 5] + // 4. Restore [1, 4] + // 5. Restore [0, 5] and Remove [3] + // 6. Restore 3 let mut expected = vec![ "+-------+--------------+-----------------+", "| value | _change_type | _commit_version |", @@ -65,13 +73,23 @@ fn cdf_with_deletion_vector() -> Result<(), Box> { "| 4 | insert | 0 |", "| 5 | insert | 0 |", "| 6 | insert | 0 |", - "| 8 | insert | 0 |", "| 7 | insert | 0 |", + "| 8 | insert | 0 |", "| 9 | insert | 0 |", "| 0 | delete | 1 |", "| 9 | delete | 1 |", "| 0 | insert | 2 |", "| 9 | insert | 2 |", + "| 0 | delete | 3 |", + "| 1 | delete | 3 |", + "| 4 | delete | 3 |", + "| 5 | delete | 3 |", + "| 1 | insert | 4 |", + "| 4 | insert | 4 |", + "| 3 | delete | 5 |", + "| 0 | insert | 5 |", + "| 5 | insert | 5 |", + "| 3 | insert | 6 |", "+-------+--------------+-----------------+", ]; sort_lines!(expected); diff --git a/kernel/tests/data/cdf-table-with-dv.tar.zst b/kernel/tests/data/cdf-table-with-dv.tar.zst index aaef3a3eb..76c9f09da 100644 Binary files a/kernel/tests/data/cdf-table-with-dv.tar.zst and b/kernel/tests/data/cdf-table-with-dv.tar.zst differ