diff --git a/rust/lance-datagen/src/generator.rs b/rust/lance-datagen/src/generator.rs index fe2806b637..76119202ea 100644 --- a/rust/lance-datagen/src/generator.rs +++ b/rust/lance-datagen/src/generator.rs @@ -555,6 +555,47 @@ impl ArrayGenerator for RandomBytesGenerato } } +// This is pretty much the same thing as RandomBinaryGenerator but we can't use that +// because there is no ArrowPrimitiveType for FixedSizeBinary +pub struct RandomFixedSizeBinaryGenerator { + data_type: DataType, + size: i32, +} + +impl RandomFixedSizeBinaryGenerator { + fn new(size: i32) -> Self { + Self { + size, + data_type: DataType::FixedSizeBinary(size), + } + } +} + +impl ArrayGenerator for RandomFixedSizeBinaryGenerator { + fn generate( + &mut self, + length: RowCount, + rng: &mut rand_xoshiro::Xoshiro256PlusPlus, + ) -> Result, ArrowError> { + let num_bytes = length.0 * self.size as u64; + let mut bytes = vec![0; num_bytes as usize]; + rng.fill_bytes(&mut bytes); + Ok(Arc::new(FixedSizeBinaryArray::new( + self.size, + Buffer::from(bytes), + None, + ))) + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn element_size_bytes(&self) -> Option { + Some(ByteCount::from(self.size as u64)) + } +} + pub struct RandomBinaryGenerator { bytes_per_element: ByteCount, scale_to_utf8: bool, @@ -1416,6 +1457,10 @@ pub mod array { Box::new(RandomBytesGenerator::::new(data_type)) } + pub fn rand_fsb(size: i32) -> Box { + Box::new(RandomFixedSizeBinaryGenerator::new(size)) + } + /// Create a generator of randomly sampled date32 values /// /// Instead of sampling the entire range, all values will be drawn from the last year as this @@ -1610,6 +1655,7 @@ pub mod array { rand_type(child.data_type()), Dimension::from(*dimension as u32), ), + DataType::FixedSizeBinary(size) => rand_fsb(*size), DataType::List(child) => rand_list(child.data_type()), DataType::Duration(unit) => match unit { TimeUnit::Second => rand::(), diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 7437ebb0ad..ec1fd3c44d 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -292,7 +292,7 @@ impl DecodeBatchScheduler { } else { match data_type { // DataType::is_primitive doesn't consider these primitive but we do - DataType::Boolean | DataType::Null => true, + DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true, DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()), _ => false, } diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 400b89b64c..e5314cd2d3 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -196,6 +196,7 @@ impl BatchEncoder { | DataType::UInt32 | DataType::UInt64 | DataType::UInt8 + | DataType::FixedSizeBinary(_) | DataType::FixedSizeList(_, _) => { let my_col_idx = *col_idx; *col_idx += 1; diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index d1f89e31c6..8e637332a5 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -14,7 +14,7 @@ use arrow_array::{ TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }, - ArrayRef, BooleanArray, FixedSizeListArray, PrimitiveArray, + ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, PrimitiveArray, }; use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer}; use arrow_schema::{DataType, IntervalUnit, TimeUnit}; @@ -356,6 +356,17 @@ impl PrimitiveFieldDecodeTask { DataType::UInt8 => Ok(Self::new_primitive_array::( buffers, num_rows, data_type, )), + DataType::FixedSizeBinary(dimension) => { + let mut buffers_iter = buffers.into_iter(); + let fsb_validity = buffers_iter.next().unwrap(); + let fsb_nulls = Self::bytes_to_validity(fsb_validity, num_rows); + + let fsb_values = buffers_iter.next().unwrap(); + let fsb_values = Buffer::from_bytes(fsb_values.freeze().into()); + Ok(Arc::new(FixedSizeBinaryArray::new( + *dimension, fsb_values, fsb_nulls, + ))) + } DataType::FixedSizeList(items, dimension) => { let mut buffers_iter = buffers.into_iter(); let fsl_validity = buffers_iter.next().unwrap(); diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index 6d4c448d6f..6e1240c796 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -130,17 +130,17 @@ pub struct ValueEncoder { impl ValueEncoder { pub fn try_new(data_type: &DataType) -> Result { - if data_type.is_primitive() { + if *data_type == DataType::Boolean { Ok(Self { - buffer_encoder: Box::::default(), + buffer_encoder: Box::::default(), }) - } else if *data_type == DataType::Boolean { + } else if data_type.is_fixed_stride() { Ok(Self { - buffer_encoder: Box::::default(), + buffer_encoder: Box::::default(), }) } else { Err(Error::invalid_input( - format!("Cannot use value encoded to encode {}", data_type), + format!("Cannot use ValueEncoder to encode {}", data_type), location!(), )) } @@ -189,6 +189,7 @@ pub(crate) mod tests { use crate::testing::check_round_trip_encoding_random; const PRIMITIVE_TYPES: &[DataType] = &[ + DataType::FixedSizeBinary(2), DataType::Date32, DataType::Date64, DataType::Int8, diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index e2f34a42b9..1fdea71782 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -36,6 +36,28 @@ pub enum ReadBatchParams { Indices(UInt32Array), } +impl std::fmt::Display for ReadBatchParams { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end), + Self::RangeFull => write!(f, "RangeFull"), + Self::RangeTo(r) => write!(f, "RangeTo({})", r.end), + Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start), + Self::Indices(indices) => { + let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| { + acc.push_str(&v.to_string()); + acc.push(','); + acc + }); + if !indices_str.is_empty() { + indices_str.pop(); + } + write!(f, "Indices({})", indices_str) + } + } + } +} + impl Default for ReadBatchParams { fn default() -> Self { // Default of ReadBatchParams is reading the full batch. diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf9695a12c..815483deaa 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2112,32 +2112,32 @@ mod tests { t } - async fn create_file(path: &std::path::Path, mode: WriteMode) { - let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("i", DataType::Int32, false), - Field::new( + async fn create_file(path: &std::path::Path, mode: WriteMode, use_experimental_writer: bool) { + let mut fields = vec![Field::new("i", DataType::Int32, false)]; + // TODO (GH-2347): currently the v2 writer does not support dictionary columns. + if !use_experimental_writer { + fields.push(Field::new( "dict", DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), false, - ), - ])); + )); + } + let schema = Arc::new(ArrowSchema::new(fields)); let dict_values = StringArray::from_iter_values(["a", "b", "c", "d", "e"]); let batches: Vec = (0..20) .map(|i| { - RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)), - Arc::new( - DictionaryArray::try_new( - UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)), - Arc::new(dict_values.clone()), - ) - .unwrap(), - ), - ], - ) - .unwrap() + let mut arrays = + vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)) as ArrayRef]; + if !use_experimental_writer { + arrays.push(Arc::new( + DictionaryArray::try_new( + UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)), + Arc::new(dict_values.clone()), + ) + .unwrap(), + )); + } + RecordBatch::try_new(schema.clone(), arrays).unwrap() }) .collect(); let expected_batches = batches.clone(); @@ -2147,6 +2147,7 @@ mod tests { max_rows_per_file: 40, max_rows_per_group: 10, mode, + use_experimental_writer, ..WriteParams::default() }; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2173,8 +2174,11 @@ mod tests { .unwrap(); // The batch size batches the group size. - for batch in &actual_batches { - assert_eq!(batch.num_rows(), 10); + // (the v2 writer has no concept of group size) + if !use_experimental_writer { + for batch in &actual_batches { + assert_eq!(batch.num_rows(), 10); + } } // sort @@ -2199,17 +2203,21 @@ mod tests { ) } + #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_dataset() { + async fn test_create_dataset(#[values(false, true)] use_experimental_writer: bool) { // Appending / Overwriting a dataset that does not exist is treated as Create for mode in [WriteMode::Create, WriteMode::Append, Overwrite] { let test_dir = tempdir().unwrap(); - create_file(test_dir.path(), mode).await + create_file(test_dir.path(), mode, use_experimental_writer).await } } + #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_and_fill_empty_dataset() { + async fn test_create_and_fill_empty_dataset( + #[values(false, true)] use_experimental_writer: bool, + ) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2233,6 +2241,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; // We should be able to append even if the metadata doesn't exactly match. @@ -2287,8 +2296,9 @@ mod tests { assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref())); } + #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_with_empty_iter() { + async fn test_create_with_empty_iter(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2299,7 +2309,13 @@ mod tests { let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()); // check schema of reader and original is same assert_eq!(schema.as_ref(), reader.schema().as_ref()); - let result = Dataset::write(reader, test_uri, None).await.unwrap(); + let write_params = Some(WriteParams { + use_experimental_writer, + ..Default::default() + }); + let result = Dataset::write(reader, test_uri, write_params) + .await + .unwrap(); // check dataset empty assert_eq!(result.count_rows(None).await.unwrap(), 0); @@ -2307,8 +2323,9 @@ mod tests { assert_eq!(result.manifest.max_fragment_id(), None); } + #[rstest] #[tokio::test] - async fn test_write_params() { + async fn test_write_params(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2329,6 +2346,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 100, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -2343,15 +2361,19 @@ mod tests { for fragment in &fragments { assert_eq!(fragment.count_rows().await.unwrap(), 100); let reader = fragment.open(dataset.schema(), false).await.unwrap(); - assert_eq!(reader.legacy_num_batches(), 10); - for i in 0..reader.legacy_num_batches() as u32 { - assert_eq!(reader.legacy_num_rows_in_batch(i).unwrap(), 10); + // No group / batch concept in v2 + if !use_experimental_writer { + assert_eq!(reader.legacy_num_batches(), 10); + for i in 0..reader.legacy_num_batches() as u32 { + assert_eq!(reader.legacy_num_rows_in_batch(i).unwrap(), 10); + } } } } + #[rstest] #[tokio::test] - async fn test_write_manifest() { + async fn test_write_manifest(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2367,7 +2389,14 @@ mod tests { .unwrap()]; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); - let write_fut = Dataset::write(batches, test_uri, None); + let write_fut = Dataset::write( + batches, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ); let write_fut = require_send(write_fut); let mut dataset = write_fut.await.unwrap(); @@ -2443,6 +2472,7 @@ mod tests { test_uri, Some(WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }), ) @@ -2451,8 +2481,9 @@ mod tests { assert!(matches!(write_result, Err(Error::NotSupported { .. }))); } + #[rstest] #[tokio::test] - async fn append_dataset() { + async fn append_dataset(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2470,6 +2501,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2528,8 +2560,9 @@ mod tests { ) } + #[rstest] #[tokio::test] - async fn test_self_dataset_append() { + async fn test_self_dataset_append(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2547,6 +2580,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2609,8 +2643,11 @@ mod tests { actual_ds.validate().await.unwrap(); } + #[rstest] #[tokio::test] - async fn test_self_dataset_append_schema_different() { + async fn test_self_dataset_append_schema_different( + #[values(false, true)] use_experimental_writer: bool, + ) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2639,6 +2676,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2721,8 +2759,9 @@ mod tests { assert!(result.is_err()); } + #[rstest] #[tokio::test] - async fn overwrite_dataset() { + async fn overwrite_dataset(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -2740,6 +2779,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2805,8 +2845,9 @@ mod tests { assert_eq!(&ArrowSchema::from(first_ver.schema()), schema.as_ref()); } + #[rstest] #[tokio::test] - async fn test_take() { + async fn test_take(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -2831,6 +2872,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2875,10 +2917,13 @@ mod tests { ); } + #[rstest] #[tokio::test] - async fn test_take_rows_out_of_bound() { + async fn test_take_rows_out_of_bound(#[values(false, true)] use_experimental_writer: bool) { // a dataset with 1 fragment and 400 rows - let test_ds = TestVectorDataset::new().await.unwrap(); + let test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); let ds = test_ds.dataset; // take the last row of first fragment @@ -2895,7 +2940,8 @@ mod tests { let indices = &[(1 << 32) - 3, (1 << 32) - 1]; let err = ds.take_rows(indices, ds.schema()).await.unwrap_err(); assert!( - err.to_string().contains("out of bounds"), + err.to_string() + .contains("Invalid read params Indices(4294967293,4294967295)"), "{}", err.to_string() ); @@ -2904,14 +2950,16 @@ mod tests { let indices = &[(1 << 32) - 1, (1 << 32) - 3]; let err = ds.take_rows(indices, ds.schema()).await.unwrap_err(); assert!( - err.to_string().contains("out of bounds"), + err.to_string() + .contains("Invalid read params Indices(4294967293,4294967295)"), "{}", err.to_string() ); } + #[rstest] #[tokio::test] - async fn test_take_rows() { + async fn test_take_rows(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -2936,6 +2984,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2990,8 +3039,9 @@ mod tests { assert_eq!(RecordBatch::new_empty(schema.clone()), values); } + #[rstest] #[tokio::test] - async fn test_fast_count_rows() { + async fn test_fast_count_rows(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -3014,6 +3064,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -3034,8 +3085,9 @@ mod tests { ); } + #[rstest] #[tokio::test] - async fn test_create_index() { + async fn test_create_index(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let dimension = 16; @@ -3061,7 +3113,16 @@ mod tests { let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); dataset.validate().await.unwrap(); // Make sure valid arguments should create index successfully @@ -3084,6 +3145,7 @@ mod tests { // Append should inherit index let write_params = WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }; let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]; @@ -3119,6 +3181,7 @@ mod tests { // Overwrite should invalidate index let write_params = WriteParams { mode: WriteMode::Overwrite, + use_experimental_writer, ..Default::default() }; let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()]; @@ -3135,8 +3198,9 @@ mod tests { assert!(fragment_bitmap.contains(0)); } + #[rstest] #[tokio::test] - async fn test_create_scalar_index() { + async fn test_create_scalar_index(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -3145,7 +3209,10 @@ mod tests { let mut dataset = Dataset::write( data.into_reader_rows(RowCount::from(16 * 1024), BatchCount::from(4)), test_uri, - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await .unwrap(); @@ -3173,7 +3240,7 @@ mod tests { dataset.index_statistics(&index_name).await.unwrap(); } - async fn create_bad_file() -> Result { + async fn create_bad_file(use_experimental_writer: bool) -> Result { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( @@ -3193,13 +3260,22 @@ mod tests { .collect(); let test_uri = test_dir.path().to_str().unwrap(); let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); - Dataset::write(reader, test_uri, None).await + Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await } + #[rstest] #[tokio::test] - async fn test_bad_field_name() { + async fn test_bad_field_name(#[values(false, true)] use_experimental_writer: bool) { // don't allow `.` in the field name - assert!(create_bad_file().await.is_err()); + assert!(create_bad_file(use_experimental_writer).await.is_err()); } #[tokio::test] @@ -3208,8 +3284,9 @@ mod tests { assert!(matches!(result.unwrap_err(), Error::DatasetNotFound { .. })); } + #[rstest] #[tokio::test] - async fn test_drop_columns() -> Result<()> { + async fn test_drop_columns(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { let metadata: HashMap = [("k1".into(), "v1".into())].into(); let schema = Arc::new(ArrowSchema::new_with_metadata( @@ -3250,7 +3327,15 @@ mod tests { let test_uri = test_dir.path().to_str().unwrap(); let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let mut dataset = Dataset::write(batches, test_uri, None).await?; + let mut dataset = Dataset::write( + batches, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await?; let lance_schema = dataset.schema().clone(); let original_fragments = dataset.fragments().to_vec(); @@ -3289,8 +3374,11 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_drop_add_columns() -> Result<()> { + async fn test_drop_add_columns( + #[values(false, true)] use_experimental_writer: bool, + ) -> Result<()> { let schema = Arc::new(ArrowSchema::new(vec![Field::new( "i", DataType::Int32, @@ -3303,7 +3391,15 @@ mod tests { let test_uri = test_dir.path().to_str().unwrap(); let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let mut dataset = Dataset::write(batches, test_uri, None).await?; + let mut dataset = Dataset::write( + batches, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await?; assert_eq!(dataset.manifest.max_field_id(), 0); // Test we can add 1 column, drop it, then add another column. Validate @@ -3501,8 +3597,9 @@ mod tests { assert_eq!(actual, expected); } + #[rstest] #[tokio::test] - async fn test_delete() { + async fn test_delete(#[values(false, true)] use_experimental_writer: bool) { fn sequence_data(range: Range) -> RecordBatch { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("i", DataType::UInt32, false), @@ -3528,7 +3625,7 @@ mod tests { let data = sequence_data(0..100); // Split over two files. let batches = vec![data.slice(0, 50), data.slice(50, 50)]; - let mut dataset = TestDatasetGenerator::new(batches) + let mut dataset = TestDatasetGenerator::new(batches, use_experimental_writer) .make_hostile(test_uri) .await; @@ -3662,8 +3759,9 @@ mod tests { assert_eq!(dataset.manifest.max_fragment_id(), Some(2)); } + #[rstest] #[tokio::test] - async fn test_restore() { + async fn test_restore(#[values(false, true)] use_experimental_writer: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![Field::new( "i", @@ -3679,7 +3777,16 @@ mod tests { vec![Arc::new(UInt32Array::from_iter_values(0..100))], ); let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); assert_eq!(dataset.manifest.version, 1); let original_manifest = dataset.manifest.clone(); @@ -3711,8 +3818,9 @@ mod tests { assert!(fragments[0].metadata.deletion_file.is_some()); } + #[rstest] #[tokio::test] - async fn test_search_empty() { + async fn test_search_empty(#[values(false, true)] use_experimental_writer: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![Field::new( "vec", @@ -3733,7 +3841,16 @@ mod tests { let data = RecordBatch::try_new(schema.clone(), vec![vectors]); let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema); - let dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); let mut stream = dataset .scan() @@ -3768,8 +3885,9 @@ mod tests { } } + #[rstest] #[tokio::test] - async fn test_search_empty_after_delete() { + async fn test_search_empty_after_delete(#[values(false, true)] use_experimental_writer: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![Field::new( "vec", @@ -3790,7 +3908,16 @@ mod tests { let data = RecordBatch::try_new(schema.clone(), vec![vectors]); let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); dataset.delete("true").await.unwrap(); let mut stream = dataset @@ -3861,8 +3988,9 @@ mod tests { } } + #[rstest] #[tokio::test] - async fn test_num_small_files() { + async fn test_num_small_files(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let dimensions = 16; let column_name = "vec"; @@ -3887,7 +4015,16 @@ mod tests { RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); let test_uri = test_dir.path().to_str().unwrap(); - let dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); dataset.validate().await.unwrap(); assert!(dataset.num_small_files(1024).await > 0); @@ -3954,8 +4091,9 @@ mod tests { Ok(results) } + #[rstest] #[tokio::test] - async fn take_scan_dataset() { + async fn take_scan_dataset(#[values(false, true)] use_experimental_writer: bool) { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("i", DataType::Int32, false), Field::new("x", DataType::Float32, false), @@ -3974,6 +4112,7 @@ mod tests { let write_params = WriteParams { max_rows_per_group: 2, + use_experimental_writer, ..Default::default() }; @@ -4048,8 +4187,9 @@ mod tests { Ok(test_dir) } + #[rstest] #[tokio::test] - async fn test_v0_7_5_migration() { + async fn test_v0_7_5_migration(#[values(false, true)] use_experimental_writer: bool) { // We migrate to add Fragment.physical_rows and DeletionFile.num_deletions // after this version. @@ -4078,6 +4218,7 @@ mod tests { let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -4116,8 +4257,11 @@ mod tests { assert_eq!(actual, expected); } + #[rstest] #[tokio::test] - async fn test_fix_v0_8_0_broken_migration() { + async fn test_fix_v0_8_0_broken_migration( + #[values(false, true)] use_experimental_writer: bool, + ) { // The migration from v0.7.5 was broken in 0.8.0. This validates we can // automatically fix tables that have this problem. @@ -4147,6 +4291,7 @@ mod tests { let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -4193,8 +4338,11 @@ mod tests { assert_eq!(actual, expected); } + #[rstest] #[tokio::test] - async fn test_v0_8_14_invalid_index_fragment_bitmap() { + async fn test_v0_8_14_invalid_index_fragment_bitmap( + #[values(false, true)] use_experimental_writer: bool, + ) { // Old versions of lance could create an index whose fragment bitmap was // invalid because it did not include fragments that were part of the index // @@ -4237,7 +4385,16 @@ mod tests { let broken_version = dataset.version().version; // Any transaction, no matter how simple, should trigger the fragment bitmap to be recalculated - dataset.append(data, None).await.unwrap(); + dataset + .append( + data, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); for idx in dataset.load_indices().await.unwrap().iter() { // The corrupt fragment_bitmap does not contain 0 but the @@ -4315,8 +4472,11 @@ mod tests { ); } + #[rstest] #[tokio::test] - async fn test_bfloat16_roundtrip() -> Result<()> { + async fn test_bfloat16_roundtrip( + #[values(false, true)] use_experimental_writer: bool, + ) -> Result<()> { let inner_field = Arc::new( Field::new("item", DataType::FixedSizeBinary(2), true).with_metadata( [ @@ -4345,7 +4505,10 @@ mod tests { let dataset = Dataset::write( RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()), test_uri, - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await?; @@ -4371,7 +4534,15 @@ mod tests { let test_dir = tempdir()?; let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = Dataset::write(reader, test_uri, None).await?; + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer: false, + ..Default::default() + }), + ) + .await?; dataset.validate().await?; // Adding a duplicate column name will break @@ -4426,8 +4597,11 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_append_columns_udf() -> Result<()> { + async fn test_append_columns_udf( + #[values(false, true)] use_experimental_writer: bool, + ) -> Result<()> { let num_rows = 5; let schema = Arc::new(ArrowSchema::new(vec![Field::new( "id", @@ -4443,7 +4617,15 @@ mod tests { let test_dir = tempdir()?; let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = Dataset::write(reader, test_uri, None).await?; + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await?; dataset.validate().await?; // Adding a duplicate column name will break @@ -4545,6 +4727,7 @@ mod tests { Some(WriteParams { max_rows_per_file: 50, max_rows_per_group: 25, + use_experimental_writer: false, ..Default::default() }), ) @@ -4686,8 +4869,11 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_rename_columns() -> Result<()> { + async fn test_rename_columns( + #[values(false, true)] use_experimental_writer: bool, + ) -> Result<()> { let metadata: HashMap = [("k1".into(), "v1".into())].into(); let schema = Arc::new(ArrowSchema::new_with_metadata( @@ -4721,7 +4907,15 @@ mod tests { let test_uri = test_dir.path().to_str().unwrap(); let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let mut dataset = Dataset::write(batches, test_uri, None).await?; + let mut dataset = Dataset::write( + batches, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await?; let original_fragments = dataset.fragments().to_vec(); @@ -4787,8 +4981,9 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_cast_column() -> Result<()> { + async fn test_cast_column(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { // Create a table with 2 scalar columns, 1 vector column let schema = Arc::new(ArrowSchema::new(vec![ Field::new("i", DataType::Int32, false), @@ -4826,7 +5021,10 @@ mod tests { let mut dataset = Dataset::write( RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()), test_uri, - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await?; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 3895a4f62d..432e8fa7af 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -17,7 +17,7 @@ use arrow_schema::Schema as ArrowSchema; use datafusion::logical_expr::Expr; use datafusion::scalar::ScalarValue; use futures::future::try_join_all; -use futures::{join, stream, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{join, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::DeletionVector; use lance_core::ROW_ID_FIELD; @@ -75,6 +75,13 @@ pub trait GenericFileReader: std::fmt::Debug + Send + Sync { batch_size: u32, projection: Arc, ) -> Result; + /// Take specific rows from the file, returning as a stream of tasks + fn take_all_tasks( + &self, + indices: &[u32], + batch_size: u32, + projection: Arc, + ) -> Result; /// Return the number of rows in the file fn len(&self) -> u32; @@ -186,6 +193,26 @@ impl GenericFileReader for FileReader { Ok(ranges_to_tasks(self, ranges, projection)) } + fn take_all_tasks( + &self, + indices: &[u32], + _batch_size: u32, + projection: Arc, + ) -> Result { + let indices_vec = indices.to_vec(); + let mut reader = self.clone(); + // In the new path the row id is added by the fragment and not the file + reader.with_row_id(false); + let task_fut = + async move { reader.take(&indices_vec, projection.as_ref(), None).await }.boxed(); + let task = std::future::ready(ReadBatchTask { + task: task_fut, + num_rows: indices.len() as u32, + }) + .boxed(); + Ok(futures::stream::once(task).boxed()) + } + /// Return the number of rows in the file fn len(&self) -> u32 { self.len() as u32 @@ -289,6 +316,24 @@ mod v2_adapter { .boxed()) } + fn take_all_tasks( + &self, + indices: &[u32], + batch_size: u32, + projection: Arc, + ) -> Result { + let indices = UInt32Array::from(indices.to_vec()); + let projection = self.projection_from_lance(projection.as_ref()); + Ok(self + .reader + .read_tasks(ReadBatchParams::Indices(indices), batch_size, &projection)? + .map(|v2_task| ReadBatchTask { + task: v2_task.task.map_err(Error::from).boxed(), + num_rows: v2_task.num_rows, + }) + .boxed()) + } + /// Return the number of rows in the file fn len(&self) -> u32 { self.reader.metadata().num_rows as u32 @@ -879,7 +924,8 @@ impl FileFragment { let range = (row_ids[0] as usize)..(row_ids[row_ids.len() - 1] as usize + 1); reader.legacy_read_range_as_batch(range).await } else { - reader.take(row_ids).await + // FIXME, change this method to streams + reader.take_as_batch(row_ids).await } } @@ -1224,6 +1270,9 @@ impl FragmentReader { self } + /// TODO: This method is relied upon by the v1 pushdown mechanism and will need to stay + /// in place until v1 is removed. v2 uses a different mechanism for pushdown and so there + /// is little benefit in updating the v1 pushdown node. pub(crate) fn legacy_num_batches(&self) -> usize { let legacy_reader = self.readers[0].0.as_legacy(); let num_batches = legacy_reader.num_batches(); @@ -1236,6 +1285,13 @@ impl FragmentReader { num_batches } + /// TODO: This method is relied upon by the v1 pushdown mechanism and will need to stay + /// in place until v1 is removed. v2 uses a different mechanism for pushdown and so there + /// is little benefit in updating the v1 pushdown node. + /// + /// This method is also used by the updater. Even though the updater has been updated to + /// use streams, the updater still needs to know the batch size in v1 so that it can create + /// files with the same batch size. pub(crate) fn legacy_num_rows_in_batch(&self, batch_id: u32) -> Option { if let Some(legacy_reader) = self.readers[0].0.as_legacy_opt() { if batch_id < legacy_reader.num_batches() as u32 { @@ -1249,6 +1305,10 @@ impl FragmentReader { } /// Read the page statistics of the fragment for the specified fields. + /// + /// TODO: This method is relied upon by the v1 pushdown mechanism and will need to stay + /// in place until v1 is removed. v2 uses a different mechanism for pushdown and so there + /// is little benefit in updating the v1 pushdown node. pub(crate) async fn legacy_read_page_stats( &self, projection: Option<&Schema>, @@ -1272,12 +1332,13 @@ impl FragmentReader { } } + #[cfg(test)] async fn read_impl<'a, Fut>( &'a self, read_fn: impl Fn(&'a dyn GenericFileReader, &'a Schema) -> Fut, ) -> Result where - Fut: Future> + 'a, + Fut: std::future::Future> + 'a, { let futures = self .readers @@ -1309,6 +1370,10 @@ impl FragmentReader { /// /// Note: the projection must be a subset of the schema the reader was created with. /// Otherwise incorrect data will be returned. + /// + /// TODO: This method is relied upon by the v1 pushdown mechanism and will need to stay + /// in place until v1 is removed. v2 uses a different mechanism for pushdown and so there + /// is little benefit in updating the v1 pushdown node. pub(crate) async fn legacy_read_batch_projected( &self, batch_id: usize, @@ -1379,7 +1444,7 @@ impl FragmentReader { if !params.valid_given_len(total_num_rows as usize) { return Err(Error::invalid_input( format!( - "Invalid read params {:?} for fragment with {} addressible rows", + "Invalid read params {} for fragment with {} addressible rows", params, total_num_rows ), location!(), @@ -1517,15 +1582,25 @@ impl FragmentReader { } /// Take rows from this fragment. - pub async fn take(&self, indices: &[u32]) -> Result { - self.read_impl(move |reader, schema| { - reader.as_legacy().take( - indices, - schema, - self.deletion_vec.as_ref().map(|dv| dv.as_ref()), - ) - }) - .await + pub async fn take(&self, indices: &[u32], batch_size: u32) -> Result { + let indices_arr = UInt32Array::from(indices.to_vec()); + self.new_read_impl( + ReadBatchParams::Indices(indices_arr), + batch_size, + move |reader, schema| reader.take_all_tasks(indices, batch_size, schema.clone()), + ) + } + + /// Take rows from this fragment, will perform a copy if the underlying reader returns multiple + /// batches. May return an error if the taken rows do not fit into a single batch. + pub async fn take_as_batch(&self, indices: &[u32]) -> Result { + let batches = self + .take(indices, u32::MAX) + .await? + .buffered(num_cpus::get()) + .try_collect::>() + .await?; + concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from) } } @@ -2217,7 +2292,7 @@ mod tests { .unwrap() .open(dataset.schema(), false) .await?; - let actual_data = reader.take(&[0, 1, 2]).await?; + let actual_data = reader.take_as_batch(&[0, 1, 2]).await?; assert_eq!(expected_data.slice(0, 3), actual_data); let actual_data = reader.legacy_read_range_as_batch(0..3).await?; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 348603803b..ce2560a113 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1536,7 +1536,7 @@ pub mod test_dataset { } impl TestVectorDataset { - pub async fn new() -> Result { + pub async fn new(use_experimental_writer: bool) -> Result { let tmp_dir = tempdir()?; let path = tmp_dir.path().to_str().unwrap(); @@ -1582,6 +1582,7 @@ pub mod test_dataset { let params = WriteParams { max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1665,6 +1666,7 @@ mod test { use lance_index::IndexType; use lance_io::object_store::ObjectStoreParams; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; + use rstest::rstest; use tempfile::{tempdir, TempDir}; use super::*; @@ -1731,7 +1733,7 @@ mod test { #[tokio::test] async fn test_filter_parsing() -> Result<()> { - let test_ds = TestVectorDataset::new().await?; + let test_ds = TestVectorDataset::new(false).await?; let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -1762,9 +1764,10 @@ mod test { Ok(()) } + #[rstest] #[tokio::test] - async fn test_limit() -> Result<()> { - let test_ds = TestVectorDataset::new().await?; + async fn test_limit(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { + let test_ds = TestVectorDataset::new(use_experimental_writer).await?; let dataset = &test_ds.dataset; let full_data = dataset.scan().try_into_batch().await?.slice(19, 2); @@ -1779,10 +1782,13 @@ mod test { Ok(()) } + #[rstest] #[tokio::test] - async fn test_knn_nodes() { + async fn test_knn_nodes(#[values(false, true)] use_experimental_writer: bool) { for build_index in &[true, false] { - let mut test_ds = TestVectorDataset::new().await.unwrap(); + let mut test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); if *build_index { test_ds.make_vector_index().await.unwrap(); } @@ -1833,9 +1839,12 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_knn_with_new_data() { - let mut test_ds = TestVectorDataset::new().await.unwrap(); + async fn test_knn_with_new_data(#[values(false, true)] use_experimental_writer: bool) { + let mut test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); test_ds.make_vector_index().await.unwrap(); test_ds.append_new_data().await.unwrap(); let dataset = &test_ds.dataset; @@ -1912,9 +1921,12 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_knn_with_prefilter() { - let mut test_ds = TestVectorDataset::new().await.unwrap(); + async fn test_knn_with_prefilter(#[values(false, true)] use_experimental_writer: bool) { + let mut test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); test_ds.make_vector_index().await.unwrap(); let dataset = &test_ds.dataset; @@ -1968,12 +1980,15 @@ mod test { assert!(actual_i.is_subset(&close_i)); } + #[rstest] #[tokio::test] - async fn test_knn_filter_new_data() { + async fn test_knn_filter_new_data(#[values(false, true)] use_experimental_writer: bool) { // This test verifies that a filter (prefilter or postfilter) gets applied to the flat KNN results // in a combined KNN scan (a scan that combines results from an indexed ANN with an unindexed flat // search of new data) - let mut test_ds = TestVectorDataset::new().await.unwrap(); + let mut test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); test_ds.make_vector_index().await.unwrap(); test_ds.append_new_data().await.unwrap(); let dataset = &test_ds.dataset; @@ -2031,9 +2046,12 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_knn_with_filter() { - let test_ds = TestVectorDataset::new().await.unwrap(); + async fn test_knn_with_filter(#[values(false, true)] use_experimental_writer: bool) { + let test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -2081,9 +2099,12 @@ mod test { assert_eq!(expected_i, actual_i); } + #[rstest] #[tokio::test] - async fn test_refine_factor() { - let test_ds = TestVectorDataset::new().await.unwrap(); + async fn test_refine_factor(#[values(false, true)] use_experimental_writer: bool) { + let test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -2132,7 +2153,10 @@ mod test { #[tokio::test] async fn test_scan_unordered_with_row_id() { - let test_ds = TestVectorDataset::new().await.unwrap(); + // This test doesn't make sense for v2 files, there is no way to get an out-of-order scan + let test_ds = TestVectorDataset::new(/*use_experimental_writer=*/ false) + .await + .unwrap(); let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -2178,8 +2202,9 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_scan_order() { + async fn test_scan_order(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2203,6 +2228,7 @@ mod test { let params = WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }; @@ -2247,8 +2273,9 @@ mod test { assert_eq!(output[1], batch1); } + #[rstest] #[tokio::test] - async fn test_scan_sort() { + async fn test_scan_sort(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2280,7 +2307,10 @@ mod test { Dataset::write( data.into_reader_rows(RowCount::from(5), BatchCount::from(1)), test_uri, - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await .unwrap(); @@ -2330,8 +2360,9 @@ mod test { .unwrap(); } + #[rstest] #[tokio::test] - async fn test_sort_multi_columns() { + async fn test_sort_multi_columns(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2355,7 +2386,10 @@ mod test { Dataset::write( data.into_reader_rows(RowCount::from(5), BatchCount::from(1)), test_uri, - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await .unwrap(); @@ -2379,8 +2413,9 @@ mod test { assert_eq!(batches_by_int_then_float[0], sorted_by_int_then_float); } + #[rstest] #[tokio::test] - async fn test_ann_prefilter() { + async fn test_ann_prefilter(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2400,7 +2435,10 @@ mod test { ) .unwrap()]; - let write_params = WriteParams::default(); + let write_params = WriteParams { + use_experimental_writer, + ..Default::default() + }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); let mut dataset = Dataset::write(batches, test_uri, Some(write_params)) .await @@ -2447,8 +2485,9 @@ mod test { assert_eq!(6, first_match); } + #[rstest] #[tokio::test] - async fn test_filter_on_large_utf8() { + async fn test_filter_on_large_utf8(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2466,7 +2505,10 @@ mod test { ) .unwrap()]; - let write_params = WriteParams::default(); + let write_params = WriteParams { + use_experimental_writer, + ..Default::default() + }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); Dataset::write(batches, test_uri, Some(write_params)) .await @@ -2496,8 +2538,9 @@ mod test { assert_eq!(batch, &expected); } + #[rstest] #[tokio::test] - async fn test_filter_with_regex() { + async fn test_filter_with_regex(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2515,7 +2558,10 @@ mod test { ) .unwrap()]; - let write_params = WriteParams::default(); + let write_params = WriteParams { + use_experimental_writer, + ..Default::default() + }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); Dataset::write(batches, test_uri, Some(write_params)) .await @@ -2582,6 +2628,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer: false, ..Default::default() }; Dataset::write(batches, test_uri, Some(write_params)) @@ -2632,8 +2679,9 @@ mod test { concat_batches(&batches[0].schema(), &batches).unwrap(); } + #[rstest] #[tokio::test] - async fn test_ann_with_deletion() { + async fn test_ann_with_deletion(#[values(false, true)] use_experimental_writer: bool) { let vec_params = vec![ // TODO: re-enable diskann test when we can tune to get reproducible results. // VectorIndexParams::with_diskann_params(MetricType::L2, DiskANNParams::new(10, 1.5, 10)), @@ -2671,7 +2719,16 @@ mod test { .unwrap()]; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); assert_eq!(dataset.index_cache_entry_count(), 0); dataset @@ -2766,6 +2823,7 @@ mod test { test_uri, Some(WriteParams { mode: WriteMode::Append, + use_experimental_writer, ..Default::default() }), ) @@ -2812,16 +2870,24 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_count_rows_with_filter() { + async fn test_count_rows_with_filter(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = BatchGenerator::new().col(Box::new( IncrementingInt32::new().named("Filter_me".to_owned()), )); - Dataset::write(data_gen.batch(32), test_uri, None) - .await - .unwrap(); + Dataset::write( + data_gen.batch(32), + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); let dataset = Dataset::open(test_uri).await.unwrap(); assert_eq!(32, dataset.scan().count_rows().await.unwrap()); @@ -2837,15 +2903,23 @@ mod test { ); } + #[rstest] #[tokio::test] - async fn test_dynamic_projection() { + async fn test_dynamic_projection(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned()))); - Dataset::write(data_gen.batch(32), test_uri, None) - .await - .unwrap(); + Dataset::write( + data_gen.batch(32), + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); let dataset = Dataset::open(test_uri).await.unwrap(); assert_eq!(32, dataset.scan().count_rows().await.unwrap()); @@ -2874,15 +2948,23 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_column_casting_function() { + async fn test_column_casting_function(#[values(false, true)] use_experimental_writer: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = BatchGenerator::new().col(Box::new(RandomVector::new().named("vec".to_owned()))); - Dataset::write(data_gen.batch(32), test_uri, None) - .await - .unwrap(); + Dataset::write( + data_gen.batch(32), + test_uri, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), + ) + .await + .unwrap(); let dataset = Dataset::open(test_uri).await.unwrap(); assert_eq!(32, dataset.scan().count_rows().await.unwrap()); @@ -2959,7 +3041,7 @@ mod test { } impl ScalarIndexTestFixture { - async fn new() -> Self { + async fn new(use_experimental_writer: bool) -> Self { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2984,6 +3066,7 @@ mod test { test_uri, Some(WriteParams { max_rows_per_file: 500, + use_experimental_writer, ..Default::default() }), ) @@ -3033,7 +3116,10 @@ mod test { dataset .append( RecordBatchIterator::new(vec![Ok(append_data)], data.schema()), - None, + Some(WriteParams { + use_experimental_writer, + ..Default::default() + }), ) .await .unwrap(); @@ -3351,9 +3437,10 @@ mod test { // There are many different ways that a query can be run and they all have slightly different // effects on the plan that gets built. This test attempts to run the same queries in various // different configurations to ensure that we get consistent results + #[rstest] #[tokio::test] - async fn test_secondary_index_scans() { - let fixture = ScalarIndexTestFixture::new().await; + async fn test_secondary_index_scans(#[values(false, true)] use_experimental_writer: bool) { + let fixture = ScalarIndexTestFixture::new(use_experimental_writer).await; for use_index in [false, true] { for use_projection in [false, true] { @@ -3436,8 +3523,9 @@ mod test { Ok(()) } + #[rstest] #[tokio::test] - async fn test_late_materialization() { + async fn test_late_materialization(#[values(false, true)] use_experimental_writer: bool) { // Create a large dataset with a scalar indexed column and a sorted but not scalar // indexed column let data = gen() @@ -3458,6 +3546,7 @@ mod test { object_store_wrapper: Some(io_stats_wrapper), ..Default::default() }), + use_experimental_writer, ..Default::default() }), ) @@ -3536,8 +3625,11 @@ mod test { assert!(second_index_scan_bytes < filtered_scan_bytes); } + #[rstest] #[tokio::test] - async fn test_project_nested() -> Result<()> { + async fn test_project_nested( + #[values(false, true)] use_experimental_writer: bool, + ) -> Result<()> { let struct_i_field = ArrowField::new("i", DataType::Int32, true); let struct_o_field = ArrowField::new("o", DataType::Utf8, true); let schema = Arc::new(ArrowSchema::new(vec![ @@ -3578,6 +3670,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, + use_experimental_writer, ..Default::default() }; Dataset::write(batches, test_uri, Some(write_params)) @@ -3601,18 +3694,22 @@ mod test { Ok(()) } + #[rstest] #[tokio::test] - async fn test_plans() -> Result<()> { + async fn test_plans(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { // Create a vector dataset - let mut dataset = TestVectorDataset::new().await?; + let mut dataset = TestVectorDataset::new(use_experimental_writer).await?; // Scans // --------------------------------------------------------------------- - assert_plan_equals( - &dataset.dataset, - |scan| scan.project(&["s"])?.filter("i > 10 and i < 20"), - "LancePushdownScan: uri=..., projection=[s], predicate=i > Int32(10) AND i < Int32(20), row_id=false, ordered=true" - ).await?; + // Experimental writer does not use LancePushdownScan + if !use_experimental_writer { + assert_plan_equals( + &dataset.dataset, + |scan| scan.project(&["s"])?.filter("i > 10 and i < 20"), + "LancePushdownScan: uri=..., projection=[s], predicate=i > Int32(10) AND i < Int32(20), row_id=false, ordered=true" + ).await?; + } assert_plan_equals( &dataset.dataset, diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 58326ec983..5be2357433 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchReader}; use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use lance_core::{datatypes::Schema, Error, Result}; use lance_datafusion::chunker::chunk_stream; use lance_datafusion::utils::{peek_reader_schema, reader_to_stream}; @@ -208,12 +208,10 @@ pub async fn write_fragments_internal( schema }; - // TODO: When writing v2 we could consider skipping this chunking step. However, leaving in - // for now as it doesn't hurt anything let mut buffered_reader = if params.use_experimental_writer { - data.and_then(|batch| std::future::ready(Ok(vec![batch]))) - .map_err(Error::from) - .boxed() + // In v2 we don't care about group size but we do want to chunk + // by max_rows_per_file + chunk_stream(data, params.max_rows_per_file) } else { chunk_stream(data, params.max_rows_per_group) }; diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index 0b6b1e6012..afcd3a76e9 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -77,6 +77,7 @@ mod test { use lance_linalg::distance::MetricType; use lance_table::io::manifest::ManifestDescribing; use roaring::RoaringBitmap; + use rstest::rstest; use serde_json::json; #[derive(Debug)] @@ -251,10 +252,15 @@ mod test { } } + #[rstest] #[tokio::test] - async fn test_vector_index_extension_roundtrip() { + async fn test_vector_index_extension_roundtrip( + #[values(false, true)] use_experimental_writer: bool, + ) { // make dataset and index that is not supported natively - let test_ds = TestVectorDataset::new().await.unwrap(); + let test_ds = TestVectorDataset::new(use_experimental_writer) + .await + .unwrap(); let idx = test_ds.dataset.load_indices().await.unwrap(); assert_eq!(idx.len(), 0); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index bab55041b0..07362876e5 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -24,6 +24,7 @@ use tokio::io::AsyncWrite; use crate::dataset::fragment::write::FragmentCreateBuilder; use crate::dataset::transaction::Operation; +use crate::dataset::WriteParams; use crate::Dataset; /// A dataset generator that can generate random layouts. This is used to test @@ -34,15 +35,20 @@ use crate::Dataset; pub struct TestDatasetGenerator { seed: Option, data: Vec, + use_experimental_writer: bool, } impl TestDatasetGenerator { /// Create a new dataset generator with the given data. /// /// Each batch will become a separate fragment in the dataset. - pub fn new(data: Vec) -> Self { + pub fn new(data: Vec, use_experimental_writer: bool) -> Self { assert!(!data.is_empty()); - Self { data, seed: None } + Self { + data, + seed: None, + use_experimental_writer, + } } /// Set the seed for the random number generator. @@ -188,6 +194,10 @@ impl TestDatasetGenerator { let reader = RecordBatchIterator::new(vec![Ok(data)], file_arrow_schema.clone()); let sub_frag = FragmentCreateBuilder::new(uri) .schema(&file_schema) + .write_params(&WriteParams { + use_experimental_writer: self.use_experimental_writer, + ..Default::default() + }) .write(reader, None) .await .unwrap(); @@ -385,9 +395,11 @@ mod tests { use super::*; use arrow_array::{ArrayRef, BooleanArray, Float64Array, Int32Array, StringArray, StructArray}; use arrow_schema::{DataType, Field as ArrowField, Fields as ArrowFields}; + use rstest::rstest; + #[rstest] #[test] - fn test_make_schema() { + fn test_make_schema(#[values(false, true)] use_experimental_writer: bool) { let arrow_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("a", DataType::Int32, false), ArrowField::new( @@ -405,7 +417,7 @@ mod tests { ])); let data = vec![RecordBatch::new_empty(arrow_schema.clone())]; - let generator = TestDatasetGenerator::new(data); + let generator = TestDatasetGenerator::new(data, use_experimental_writer); let schema = generator.make_schema(&mut rand::thread_rng()); let roundtripped_schema = ArrowSchema::from(&schema); @@ -427,8 +439,9 @@ mod tests { assert!(num_holes > 0, "Expected at least one hole in the field ids"); } + #[rstest] #[tokio::test] - async fn test_make_fragment() { + async fn test_make_fragment(#[values(false, true)] use_experimental_writer: bool) { let tmp_dir = tempfile::tempdir().unwrap(); let struct_fields: ArrowFields = vec![ @@ -458,7 +471,7 @@ mod tests { ) .unwrap(); - let generator = TestDatasetGenerator::new(vec![data.clone()]); + let generator = TestDatasetGenerator::new(vec![data.clone()], use_experimental_writer); let mut rng = rand::thread_rng(); for _ in 1..50 { let schema = generator.make_schema(&mut rng); @@ -488,8 +501,9 @@ mod tests { } } + #[rstest] #[tokio::test] - async fn test_make_hostile() { + async fn test_make_hostile(#[values(false, true)] use_experimental_writer: bool) { let tmp_dir = tempfile::tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -519,7 +533,7 @@ mod tests { ]; let seed = 42; - let generator = TestDatasetGenerator::new(data.clone()).seed(seed); + let generator = TestDatasetGenerator::new(data.clone(), use_experimental_writer).seed(seed); let path = tmp_dir.path().join("ds1"); let dataset = generator.make_hostile(path.to_str().unwrap()).await; @@ -541,7 +555,7 @@ mod tests { .map(|rb| rb.project(&projection).unwrap()) .collect::>(); - let generator = TestDatasetGenerator::new(data.clone()); + let generator = TestDatasetGenerator::new(data.clone(), use_experimental_writer); // Sample a few for i in 1..20 { let path = tmp_dir.path().join(format!("test_ds_{}_{}", num_cols, i));