From e733bae6a8bc7204183f6b467f4fe8d4a38b68b1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sat, 27 Jan 2024 09:51:24 -0800 Subject: [PATCH] fix: do not panic when performing a pushdown scan on a multi-data-file fragment (#1873) The normal scan algorithm is: ``` open fragment reader with projected schema for batch in batches: scan batch ``` The pushdown algorithm is: ``` open fragment with full schema for batch, simplified_projection in filter(batches): projected scan batch(simplified_projection) ``` This means that the data files that need to be read could change from batch to batch. This was not previously being accounted for and now it is. Closes #1871 --- python/python/tests/test_dataset.py | 21 +++++++++++ rust/lance/src/dataset/fragment.rs | 54 +++++++++++++++++++++-------- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index c982a3ee8c..9f40a998a7 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -619,6 +619,27 @@ def test_merge_with_commit(tmp_path: Path): assert tbl == expected +def test_merge_search(tmp_path: Path): + left_table = pa.Table.from_pydict({"id": [1, 2, 3], "left": ["a", "b", "c"]}) + right_table = pa.Table.from_pydict({"id": [1, 2, 3], "right": ["A", "B", "C"]}) + + left_ds = lance.write_dataset(left_table, tmp_path / "left") + + right_ds = lance.write_dataset(right_table, tmp_path / "right") + left_ds.merge(right_ds, "id") + + full = left_ds.to_table() + full_filtered = left_ds.to_table(filter="id < 3") + + partial = left_ds.to_table(columns=["left"]) + + assert full.column("left") == partial.column("left") + + partial = left_ds.to_table(columns=["left"], filter="id < 3") + + assert full_filtered.column("left") == partial.column("left") + + def test_data_files(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) base_dir = tmp_path / "test" diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 4f8fc24237..d8a39e6fe9 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -723,6 +723,9 @@ pub struct FragmentReader { /// ID of the fragment fragment_id: usize, + + /// True if we are reading the row id + with_row_id: bool, } impl std::fmt::Display for FragmentReader { @@ -772,10 +775,12 @@ impl FragmentReader { readers, deletion_vec, fragment_id, + with_row_id: false, }) } pub(crate) fn with_row_id(&mut self) -> &mut Self { + self.with_row_id = true; self.readers[0].0.with_row_id(true); self } @@ -856,22 +861,41 @@ impl FragmentReader { params: impl Into + Clone, projection: &Schema, ) -> Result { - let read_tasks = self.readers.iter().map(|(reader, schema)| { - let projection = schema.intersection(projection); - let params = params.clone(); - - async move { - reader - .read_batch( - batch_id as i32, - params, - &projection?, - self.deletion_vec.as_ref().map(|dv| dv.as_ref()), - ) - .await - } - }); + let read_tasks = self + .readers + .iter() + .enumerate() + .map(|(reader_idx, (reader, schema))| { + let projection = schema.intersection(projection); + let params = params.clone(); + + async move { + // Apply ? inside the task to keep read_tasks a simple iter of futures + // for try_join_all + let projection = projection?; + // We always get the row_id from the first reader and so we need that even + // if the projection is empty + let need_for_row_id = self.with_row_id && reader_idx == 0; + if projection.fields.is_empty() && !need_for_row_id { + // The projection caused one of the data files to become + // irrelevant and so we can skip it + Result::Ok(None) + } else { + Ok(Some( + reader + .read_batch( + batch_id as i32, + params, + &projection, + self.deletion_vec.as_ref().map(|dv| dv.as_ref()), + ) + .await?, + )) + } + } + }); let batches = try_join_all(read_tasks).await?; + let batches = batches.into_iter().flatten().collect::>(); let result = merge_batches(&batches)?; Ok(result)