diff --git a/Cargo.toml b/Cargo.toml index 6ee3ddde99..0f01af92f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "rust/lance-core", "rust/lance-datagen", "rust/lance-encoding", + "rust/lance-encoding-datafusion", "rust/lance-file", "rust/lance-index", "rust/lance-io", @@ -48,6 +49,7 @@ lance-core = { version = "=0.11.1", path = "./rust/lance-core" } lance-datafusion = { version = "=0.11.1", path = "./rust/lance-datafusion" } lance-datagen = { version = "=0.11.1", path = "./rust/lance-datagen" } lance-encoding = { version = "=0.11.1", path = "./rust/lance-encoding" } +lance-encoding-datafusion = { version = "=0.11.1", path = "./rust/lance-encoding-datafusion" } lance-file = { version = "=0.11.1", path = "./rust/lance-file" } lance-index = { version = "=0.11.1", path = "./rust/lance-index" } lance-io = { version = "=0.11.1", path = "./rust/lance-io" } diff --git a/protos/encodings-df.proto b/protos/encodings-df.proto new file mode 100644 index 0000000000..8318795014 --- /dev/null +++ b/protos/encodings-df.proto @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + + +syntax = "proto3"; + +package lance.encodings_datafusion; + +import "encodings.proto"; + +// ZoneMaps are a way to wrap any leaf array with a set of zone maps that +// can be used to apply pushdown filtering. +// +// A "zone map" is the min/max/null_count of a set of rows. This can be +// used to quickly filter out zones which are not included in simple predicates +// like 'x = 5' or 'x > 10'. +message ZoneMaps { + + // How many rows are covered by each zone map. There will be + // ceil_div(num_rows, rows_per_map) zone maps. + uint32 rows_per_map = 1; + + // The zone maps are encoded as struct arrays with 1 row per zone. This + // should be stored in a column metadata buffer. The struct array should + // have 3 children: min: T, max: T, null_count: u32 + lance.encodings.ArrayEncoding stats = 2; + + // The underlying array values + lance.encodings.ArrayEncoding values = 5; +} + diff --git a/python/src/file.rs b/python/src/file.rs index f862769752..8efb0a7dd4 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -185,6 +185,7 @@ impl LanceFileWriter { FileWriterOptions { data_cache_bytes, keep_original_array, + ..Default::default() }, ) .infer_error()?; diff --git a/rust/lance-encoding-datafusion/Cargo.toml b/rust/lance-encoding-datafusion/Cargo.toml new file mode 100644 index 0000000000..5f3b7e1ddf --- /dev/null +++ b/rust/lance-encoding-datafusion/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "lance-encoding-datafusion" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +readme = "README.md" +description = "Encoders and decoders for the Lance file format that rely on datafusion" +keywords.workspace = true +categories.workspace = true +rust-version.workspace = true + +[dependencies] +lance-core = { workspace = true, features = ["datafusion"] } +lance-encoding.workspace = true +lance-file.workspace = true +arrow-array.workspace = true +arrow-buffer.workspace = true +arrow-schema.workspace = true +datafusion-common.workspace = true +datafusion-expr.workspace = true +datafusion-physical-expr.workspace = true +futures.workspace = true +prost.workspace = true +prost-types.workspace = true + +[dev-dependencies] +rand.workspace = true +tokio.workspace = true +lance-datagen.workspace = true + +[build-dependencies] +prost-build.workspace = true + +[target.'cfg(target_os = "linux")'.dev-dependencies] +pprof = { workspace = true } diff --git a/rust/lance-encoding-datafusion/README.md b/rust/lance-encoding-datafusion/README.md new file mode 100644 index 0000000000..b10ff11eb2 --- /dev/null +++ b/rust/lance-encoding-datafusion/README.md @@ -0,0 +1,8 @@ +# lance-encoding-datafusion + +`lance-encoding-datafusion` is an internal sub-crate, containing encoders and +decoders for the Lance file format that rely on Datafusion. Partly this is to +keep the size of `lance-encoding` small and partly this is to prove that +encodings are extensible. + +**Important Note**: This crate is **not intended for external usage**. diff --git a/rust/lance-encoding-datafusion/build.rs b/rust/lance-encoding-datafusion/build.rs new file mode 100644 index 0000000000..8d89a39ac3 --- /dev/null +++ b/rust/lance-encoding-datafusion/build.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::io::Result; + +fn main() -> Result<()> { + println!("cargo:rerun-if-changed=protos"); + + let mut prost_build = prost_build::Config::new(); + prost_build.extern_path(".lance.encodings", "::lance_encoding::format::pb"); + prost_build.protoc_arg("--experimental_allow_proto3_optional"); + prost_build.enable_type_names(); + prost_build.compile_protos(&["./protos/encodings-df.proto"], &["./protos"])?; + + Ok(()) +} diff --git a/rust/lance-encoding-datafusion/protos b/rust/lance-encoding-datafusion/protos new file mode 120000 index 0000000000..69d0d0d54b --- /dev/null +++ b/rust/lance-encoding-datafusion/protos @@ -0,0 +1 @@ +../../protos \ No newline at end of file diff --git a/rust/lance-encoding-datafusion/src/format.rs b/rust/lance-encoding-datafusion/src/format.rs new file mode 100644 index 0000000000..4c9ded86a9 --- /dev/null +++ b/rust/lance-encoding-datafusion/src/format.rs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +/// Protobuf definitions for encodings +pub mod pb { + #![allow(clippy::all)] + #![allow(non_upper_case_globals)] + #![allow(non_camel_case_types)] + #![allow(non_snake_case)] + #![allow(unused)] + #![allow(improper_ctypes)] + #![allow(clippy::upper_case_acronyms)] + #![allow(clippy::use_self)] + include!(concat!(env!("OUT_DIR"), "/lance.encodings_datafusion.rs")); +} diff --git a/rust/lance-encoding-datafusion/src/lib.rs b/rust/lance-encoding-datafusion/src/lib.rs new file mode 100644 index 0000000000..199c925bd4 --- /dev/null +++ b/rust/lance-encoding-datafusion/src/lib.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use arrow_schema::DataType; +use lance_encoding::encoder::{ + ColumnIndexSequence, CoreFieldEncodingStrategy, FieldEncodingStrategy, +}; +use zone::ZoneMapsFieldEncoder; + +pub mod format; +pub mod zone; + +/// Wraps the core encoding strategy and adds the encoders from this +/// crate +#[derive(Debug)] +pub struct LanceDfFieldEncodingStrategy { + core: CoreFieldEncodingStrategy, + rows_per_map: u32, +} + +impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy { + fn create_field_encoder( + &self, + encoding_strategy_root: &dyn FieldEncodingStrategy, + field: &lance_core::datatypes::Field, + column_index: &mut ColumnIndexSequence, + cache_bytes_per_column: u64, + keep_original_array: bool, + config: &std::collections::HashMap, + ) -> lance_core::Result> { + let data_type = field.data_type(); + if data_type.is_primitive() + || matches!( + data_type, + DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8 + ) + { + let inner_encoder = self.core.create_field_encoder( + // Don't collect stats on inner string fields + &self.core, + field, + column_index, + cache_bytes_per_column, + keep_original_array, + config, + )?; + Ok(Box::new(ZoneMapsFieldEncoder::try_new( + inner_encoder, + data_type.clone(), + self.rows_per_map, + )?)) + } else { + self.core.create_field_encoder( + encoding_strategy_root, + field, + column_index, + cache_bytes_per_column, + keep_original_array, + config, + ) + } + } +} diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs new file mode 100644 index 0000000000..68e044df7b --- /dev/null +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -0,0 +1,221 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, UInt32Array}; +use arrow_buffer::Buffer; +use arrow_schema::{Field, Schema}; +use datafusion_common::{arrow::datatypes::DataType, ScalarValue}; +use datafusion_expr::Accumulator; +use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; +use futures::{future::BoxFuture, FutureExt}; +use lance_encoding::encoder::{ + encode_batch, CoreFieldEncodingStrategy, EncodedBuffer, FieldEncoder, +}; + +use lance_core::Result; +use lance_file::v2::writer::EncodedBatchWriteExt; + +struct CreatedZoneMap { + min: ScalarValue, + max: ScalarValue, + null_count: u32, +} + +/// A field encoder that creates zone maps for the data it encodes +/// +/// This encoder will create zone maps for the data it encodes. The zone maps are created by +/// dividing the data into zones of a fixed size and calculating the min/max values for each +/// zone. The zone maps are then encoded as metadata. +/// +/// This metadata can be used by the reader to skip over zones that don't contain data that +/// matches the query. +pub struct ZoneMapsFieldEncoder { + items_encoder: Box, + items_type: DataType, + + rows_per_map: u32, + + maps: Vec, + cur_offset: u32, + min: MinAccumulator, + max: MaxAccumulator, + null_count: u32, +} + +impl ZoneMapsFieldEncoder { + pub fn try_new( + items_encoder: Box, + items_type: DataType, + rows_per_map: u32, + ) -> Result { + let min = MinAccumulator::try_new(&items_type)?; + let max = MaxAccumulator::try_new(&items_type)?; + Ok(Self { + rows_per_map, + items_encoder, + items_type, + min, + max, + null_count: 0, + cur_offset: 0, + maps: Vec::new(), + }) + } +} + +impl ZoneMapsFieldEncoder { + fn new_map(&mut self) -> Result<()> { + // TODO: We should be truncating the min/max values here + let map = CreatedZoneMap { + min: self.min.evaluate()?, + max: self.max.evaluate()?, + null_count: self.null_count, + }; + self.maps.push(map); + self.min = MinAccumulator::try_new(&self.items_type)?; + self.max = MaxAccumulator::try_new(&self.items_type)?; + self.null_count = 0; + self.cur_offset = 0; + Ok(()) + } + + fn update_stats(&mut self, array: &ArrayRef) -> Result<()> { + self.null_count += array.null_count() as u32; + self.min.update_batch(&[array.clone()])?; + self.max.update_batch(&[array.clone()])?; + Ok(()) + } + + fn update(&mut self, array: &ArrayRef) -> Result<()> { + let mut remaining = array.len() as u32; + let mut offset = 0; + + while remaining > 0 { + let desired = self.rows_per_map - self.cur_offset; + if desired > remaining { + // Not enough data to fill a map, increment counts and return + self.update_stats(&array.slice(offset, remaining as usize))?; + self.cur_offset += remaining; + break; + } else { + // We have enough data to fill a map + self.update_stats(&array.slice(offset, desired as usize))?; + self.new_map()?; + } + offset += desired as usize; + remaining = remaining.saturating_sub(desired); + } + Ok(()) + } + + async fn maps_to_metadata(&mut self) -> Result> { + let maps = std::mem::take(&mut self.maps); + let (mins, (maxes, null_counts)): (Vec<_>, (Vec<_>, Vec<_>)) = maps + .into_iter() + .map(|mp| (mp.min, (mp.max, mp.null_count))) + .unzip(); + let mins = ScalarValue::iter_to_array(mins.into_iter())?; + let maxes = ScalarValue::iter_to_array(maxes.into_iter())?; + let null_counts = Arc::new(UInt32Array::from_iter_values(null_counts.into_iter())); + let zone_map_schema = Arc::new(Schema::new(vec![ + Field::new("min", mins.data_type().clone(), true), + Field::new("max", maxes.data_type().clone(), true), + Field::new("null_count", DataType::UInt32, false), + ])); + let zone_maps = RecordBatch::try_new(zone_map_schema, vec![mins, maxes, null_counts])?; + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let encoded_zone_maps = encode_batch(&zone_maps, &encoding_strategy, u64::MAX).await?; + let zone_maps_buffer = encoded_zone_maps.try_to_mini_lance()?; + Ok(vec![EncodedBuffer { + parts: vec![Buffer::from(zone_maps_buffer)], + }]) + } +} + +impl FieldEncoder for ZoneMapsFieldEncoder { + fn maybe_encode( + &mut self, + array: ArrayRef, + ) -> Result> { + // TODO: If we do the zone map calculation as part of the encoding task then we can + // parallelize statistics gathering. Could be faster too since the encoding task is + // going to need to access the same data (although the input to an encoding task is + // probably too big for the CPU cache anyways). We can worry about this if we need + // to improve write speed. + self.update(&array)?; + self.items_encoder.maybe_encode(array) + } + + fn flush(&mut self) -> Result> { + if self.cur_offset > 0 { + // Create final map + self.new_map()?; + } + self.items_encoder.flush() + } + + fn finish(&mut self) -> BoxFuture<'_, Result>> { + async move { self.maps_to_metadata().await }.boxed() + } + + fn num_columns(&self) -> u32 { + self.items_encoder.num_columns() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use arrow_array::types::Int32Type; + use arrow_schema::DataType; + use lance_datagen::{BatchCount, RowCount}; + use lance_encoding::encoder::{ + ColumnIndexSequence, CoreFieldEncodingStrategy, FieldEncoder, FieldEncodingStrategy, + }; + + #[tokio::test] + async fn test_basic_stats() { + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let mut col_idx_seq = ColumnIndexSequence::default(); + let mock_field = lance_core::datatypes::Field::try_from(arrow_schema::Field::new( + "foo", + DataType::Int32, + false, + )) + .unwrap(); + let inner = encoding_strategy + .create_field_encoder( + &encoding_strategy, + &mock_field, + &mut col_idx_seq, + 4096, + true, + &HashMap::new(), + ) + .unwrap(); + let mut encoder = + super::ZoneMapsFieldEncoder::try_new(inner, DataType::Int32, 100).unwrap(); + + let gen = lance_datagen::gen() + .anon_col(lance_datagen::array::step::()) + .into_reader_rows(RowCount::from(1024), BatchCount::from(7)); + + for batch in gen { + let batch = batch.unwrap(); + let array = batch.column(0); + encoder.maybe_encode(array.clone()).unwrap(); + } + + let zone_maps_buffer = encoder.finish().await.unwrap(); + assert_eq!(zone_maps_buffer.len(), 1); + let zone_maps_buffer = zone_maps_buffer.into_iter().next().unwrap(); + assert_eq!(zone_maps_buffer.parts.len(), 1); + let zone_maps_buffer = zone_maps_buffer.parts.into_iter().next().unwrap(); + // TODO: Once reading is available we can check the contents of the zone maps buffer + // TODO: Test out the different types + assert!(!zone_maps_buffer.is_empty()); + } +} diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 60120d7544..ace4cdf909 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Field, TimeUnit}; use criterion::{criterion_group, criterion_main, Criterion}; -use lance_encoding::encoder::encode_batch; +use lance_encoding::encoder::{encode_batch, CoreFieldEncodingStrategy}; const PRIMITIVE_TYPES: &[DataType] = &[ DataType::Date32, @@ -57,7 +57,10 @@ fn bench_decode(c: &mut Criterion) { .unwrap(); let input_bytes = data.get_array_memory_size(); group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoded = rt.block_on(encode_batch(&data, 1024 * 1024)).unwrap(); + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let encoded = rt + .block_on(encode_batch(&data, &encoding_strategy, 1024 * 1024)) + .unwrap(); let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { b.iter(|| { @@ -82,7 +85,10 @@ fn bench_decode_fsl(c: &mut Criterion) { .unwrap(); let input_bytes = data.get_array_memory_size(); group.throughput(criterion::Throughput::Bytes(input_bytes as u64)); - let encoded = rt.block_on(encode_batch(&data, 1024 * 1024)).unwrap(); + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let encoded = rt + .block_on(encode_batch(&data, &encoding_strategy, 1024 * 1024)) + .unwrap(); let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { b.iter(|| { diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 2e3c2e5fa8..f2afcb9070 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -1,20 +1,25 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_schema::DataType; use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; +use futures::FutureExt; use lance_core::datatypes::{Field, Schema}; use lance_core::Result; +use crate::encodings::physical::value::{parse_compression_scheme, CompressionScheme}; use crate::{ decoder::{ColumnInfo, PageInfo}, - encodings::logical::{ - binary::BinaryFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder, - r#struct::StructFieldEncoder, + encodings::{ + logical::{ + binary::BinaryFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder, + r#struct::StructFieldEncoder, + }, + physical::{basic::BasicEncoder, fixed_size_list::FslEncoder, value::ValueEncoder}, }, format::pb, }; @@ -153,24 +158,134 @@ pub trait FieldEncoder: Send { /// Flush any remaining data from the buffers into encoding tasks /// /// This may be called intermittently throughout encoding but will always be called - /// once at the end of encoding. + /// once at the end of encoding just before calling finish fn flush(&mut self) -> Result>; + /// Finish encoding and return column metadata buffers + /// + /// This is called only once, after all encode tasks have completed + /// + /// By default, returns an empty Vec (no column metadata buffers) + fn finish(&mut self) -> BoxFuture<'_, Result>> { + std::future::ready(Ok(vec![])).boxed() + } /// The number of output columns this encoding will create fn num_columns(&self) -> u32; } -pub struct BatchEncoder { - pub field_encoders: Vec>, - pub field_id_to_column_index: Vec<(i32, i32)>, +/// A trait to pick which encoding strategy to use for a single page +/// of data +/// +/// Presumably, implementations will make encoding decisions based on +/// array statistics. +pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug { + fn create_array_encoder(&self, arrays: &[ArrayRef]) -> Result>; } -impl BatchEncoder { - pub(crate) fn get_encoder_for_field( +/// The core array encoding strategy is a set of basic encodings that +/// are generally applicable in most scenarios. +#[derive(Debug, Default)] +pub struct CoreArrayEncodingStrategy; + +fn get_compression_scheme() -> CompressionScheme { + let compression_scheme = std::env::var("LANCE_PAGE_COMPRESSION").unwrap_or("none".to_string()); + parse_compression_scheme(&compression_scheme).unwrap_or(CompressionScheme::None) +} + +impl CoreArrayEncodingStrategy { + fn array_encoder_from_type(data_type: &DataType) -> Result> { + match data_type { + DataType::FixedSizeList(inner, dimension) => { + Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new( + Self::array_encoder_from_type(inner.data_type())?, + *dimension as u32, + ))))) + } + _ => Ok(Box::new(BasicEncoder::new(Box::new( + ValueEncoder::try_new(data_type, get_compression_scheme())?, + )))), + } + } +} + +impl ArrayEncodingStrategy for CoreArrayEncodingStrategy { + fn create_array_encoder(&self, arrays: &[ArrayRef]) -> Result> { + Self::array_encoder_from_type(arrays[0].data_type()) + } +} + +/// Keeps track of the current column index and makes a mapping +/// from field id to column index +#[derive(Default)] +pub struct ColumnIndexSequence { + current_index: u32, + mapping: Vec<(i32, i32)>, +} + +impl ColumnIndexSequence { + pub fn next_column_index(&mut self, field_id: i32) -> u32 { + let idx = self.current_index; + self.current_index += 1; + self.mapping.push((field_id, idx as i32)); + idx + } + + pub fn skip(&mut self) { + self.current_index += 1; + } +} + +/// A trait to pick which kind of field encoding to use for a field +/// +/// Unlike the ArrayEncodingStrategy, the field encoding strategy is +/// chosen before any data is generated and the same field encoder is +/// used for all data in the field. +pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug { + /// Choose and create an appropriate field encoder for the given + /// field. + /// + /// The field encoder can be chosen on the data type as well as + /// any metadata that is attached to the field. + /// + /// The `encoding_strategy_root` is the encoder that should be + /// used to encode any inner data in struct / list / etc. fields. + /// + /// Initially it is the same as `self` and generally should be + /// forwarded to any inner encoding strategy. + fn create_field_encoder( + &self, + encoding_strategy_root: &dyn FieldEncodingStrategy, + field: &Field, + column_index: &mut ColumnIndexSequence, + cache_bytes_per_column: u64, + keep_original_array: bool, + config: &HashMap, + ) -> Result>; +} + +/// The core field encoding strategy is a set of basic encodings that +/// are generally applicable in most scenarios. +#[derive(Debug)] +pub struct CoreFieldEncodingStrategy { + array_encoding_strategy: Arc, +} + +impl Default for CoreFieldEncodingStrategy { + fn default() -> Self { + Self { + array_encoding_strategy: Arc::new(CoreArrayEncodingStrategy), + } + } +} + +impl FieldEncodingStrategy for CoreFieldEncodingStrategy { + fn create_field_encoder( + &self, + encoding_strategy_root: &dyn FieldEncodingStrategy, field: &Field, + column_index: &mut ColumnIndexSequence, cache_bytes_per_column: u64, keep_original_array: bool, - col_idx: &mut u32, - field_col_mapping: &mut Vec<(i32, i32)>, + _config: &HashMap, ) -> Result> { match field.data_type() { DataType::Boolean @@ -197,94 +312,99 @@ impl BatchEncoder { | DataType::UInt64 | DataType::UInt8 | DataType::FixedSizeBinary(_) - | DataType::FixedSizeList(_, _) => { - let my_col_idx = *col_idx; - *col_idx += 1; - field_col_mapping.push((field.id, my_col_idx as i32)); - Ok(Box::new(PrimitiveFieldEncoder::try_new( - cache_bytes_per_column, - keep_original_array, - &field.data_type(), - my_col_idx, - )?)) - } - DataType::List(_) => { - let my_col_idx = *col_idx; - field_col_mapping.push((field.id, my_col_idx as i32)); - *col_idx += 1; - let inner_encoding = Self::get_encoder_for_field( + | DataType::FixedSizeList(_, _) => Ok(Box::new(PrimitiveFieldEncoder::try_new( + cache_bytes_per_column, + keep_original_array, + self.array_encoding_strategy.clone(), + column_index.next_column_index(field.id), + )?)), + DataType::List(child) => { + let list_idx = column_index.next_column_index(field.id); + let inner_encoding = encoding_strategy_root.create_field_encoder( + encoding_strategy_root, &field.children[0], + column_index, cache_bytes_per_column, keep_original_array, - col_idx, - field_col_mapping, + child.metadata(), )?; Ok(Box::new(ListFieldEncoder::new( inner_encoding, cache_bytes_per_column, keep_original_array, - my_col_idx, + list_idx, ))) } DataType::Struct(_) => { - let header_col_idx = *col_idx; - field_col_mapping.push((field.id, header_col_idx as i32)); - *col_idx += 1; + let header_idx = column_index.next_column_index(field.id); let children_encoders = field .children .iter() .map(|field| { - Self::get_encoder_for_field( + self.create_field_encoder( + encoding_strategy_root, field, + column_index, cache_bytes_per_column, keep_original_array, - col_idx, - field_col_mapping, + &field.metadata, ) }) .collect::>>()?; Ok(Box::new(StructFieldEncoder::new( children_encoders, - header_col_idx, + header_idx, ))) } DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => { - let my_col_idx = *col_idx; - field_col_mapping.push((field.id, my_col_idx as i32)); - *col_idx += 2; + let list_idx = column_index.next_column_index(field.id); + column_index.skip(); Ok(Box::new(BinaryFieldEncoder::new( cache_bytes_per_column, keep_original_array, - my_col_idx, + list_idx, ))) } - _ => todo!("Implement encoding for data type {}", field.data_type()), + _ => todo!("Implement encoding for field {}", field), } } +} +/// A batch encoder that encodes RecordBatch objects by delegating +/// to field encoders for each top-level field in the batch. +pub struct BatchEncoder { + pub field_encoders: Vec>, + pub field_id_to_column_index: Vec<(i32, i32)>, +} + +impl BatchEncoder { pub fn try_new( schema: &Schema, + strategy: &dyn FieldEncodingStrategy, cache_bytes_per_column: u64, keep_original_array: bool, ) -> Result { let mut col_idx = 0; - let mut field_col_mapping = Vec::new(); + let mut col_idx_sequence = ColumnIndexSequence::default(); let field_encoders = schema .fields .iter() .map(|field| { - Self::get_encoder_for_field( + let encoder = strategy.create_field_encoder( + strategy, field, + &mut col_idx_sequence, cache_bytes_per_column, keep_original_array, - &mut col_idx, - &mut field_col_mapping, - ) + &field.metadata, + )?; + col_idx += encoder.as_ref().num_columns(); + Ok(encoder) }) .collect::>>()?; Ok(Self { field_encoders, - field_id_to_column_index: field_col_mapping, + field_id_to_column_index: col_idx_sequence.mapping, }) } @@ -312,15 +432,17 @@ pub struct EncodedBatch { /// niche situations like IPC. pub async fn encode_batch( batch: &RecordBatch, + encoding_strategy: &dyn FieldEncodingStrategy, cache_bytes_per_column: u64, ) -> Result { let mut data_buffer = BytesMut::new(); let lance_schema = Schema::try_from(batch.schema().as_ref())?; - // At this point, this is just a test utility, and there is no point in copying allocations - // This could become configurable in the future if needed. - let keep_original_array = true; - let batch_encoder = - BatchEncoder::try_new(&lance_schema, cache_bytes_per_column, keep_original_array)?; + let batch_encoder = BatchEncoder::try_new( + &lance_schema, + encoding_strategy, + cache_bytes_per_column, + true, + )?; let mut page_table = Vec::new(); for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) { let mut tasks = encoder.maybe_encode(arr.clone())?; @@ -332,12 +454,12 @@ pub async fn encode_batch( buffers.sort_by_key(|b| b.index); let mut buffer_offsets_and_sizes = Vec::new(); for buffer in buffers { - let offset = data_buffer.len() as u64; - let size = buffer.parts.iter().map(|p| p.len()).sum::() as u64; - buffer_offsets_and_sizes.push((offset, size)); + let buffer_offset = data_buffer.len() as u64; for part in buffer.parts { data_buffer.extend_from_slice(&part); } + let size = data_buffer.len() as u64 - buffer_offset; + buffer_offsets_and_sizes.push((buffer_offset, size)); } pages.push(Arc::new(PageInfo { buffer_offsets_and_sizes: Arc::new(buffer_offsets_and_sizes), diff --git a/rust/lance-encoding/src/encodings/logical/binary.rs b/rust/lance-encoding/src/encodings/logical/binary.rs index 6fb16a5934..35e1bde474 100644 --- a/rust/lance-encoding/src/encodings/logical/binary.rs +++ b/rust/lance-encoding/src/encodings/logical/binary.rs @@ -19,7 +19,7 @@ use crate::{ decoder::{ DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, SchedulerContext, }, - encoder::{EncodeTask, FieldEncoder}, + encoder::{CoreArrayEncodingStrategy, EncodeTask, FieldEncoder}, }; use super::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder}; @@ -173,7 +173,7 @@ impl BinaryFieldEncoder { PrimitiveFieldEncoder::try_new( cache_bytes_per_column, keep_original_array, - &DataType::UInt8, + Arc::new(CoreArrayEncodingStrategy), column_index + 1, ) .unwrap(), diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 9b917a3ae1..d919633515 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -24,14 +24,17 @@ use crate::{ SchedulerContext, }, encoder::{ArrayEncoder, EncodeTask, EncodedArray, EncodedPage, FieldEncoder}, - encodings::logical::r#struct::SimpleStructScheduler, + encodings::{ + logical::r#struct::SimpleStructScheduler, + physical::{ + basic::BasicEncoder, + value::{CompressionScheme, ValueEncoder}, + }, + }, format::pb, }; -use super::{ - primitive::{AccumulationQueue, PrimitiveFieldEncoder}, - r#struct::SimpleStructDecoder, -}; +use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder}; /// A page scheduler for list fields that encodes offsets in one field and items in another /// @@ -674,9 +677,9 @@ impl ListOffsetsEncoder { column_index, keep_original_array, ), - inner_encoder: PrimitiveFieldEncoder::array_encoder_from_data_type(&DataType::UInt64) - .unwrap() - .into(), + inner_encoder: Arc::new(BasicEncoder::new(Box::new( + ValueEncoder::try_new(&DataType::Int64, CompressionScheme::None).unwrap(), + ))), column_index, } } diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 80205c703f..7e143e68ce 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -26,18 +26,13 @@ use snafu::{location, Location}; use lance_core::{Error, Result}; -use crate::encodings::physical::parse_compression_scheme; -use crate::encodings::physical::value::CompressionScheme; use crate::{ decoder::{ DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, PageInfo, PhysicalPageDecoder, PhysicalPageScheduler, SchedulerContext, }, - encoder::{ArrayEncoder, EncodeTask, EncodedPage, FieldEncoder}, - encodings::physical::{ - basic::BasicEncoder, decoder_from_array_encoding, fixed_size_list::FslEncoder, - value::ValueEncoder, ColumnBuffers, PageBuffers, - }, + encoder::{ArrayEncodingStrategy, EncodeTask, EncodedPage, FieldEncoder}, + encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers}, }; /// A page scheduler for primitive fields @@ -521,34 +516,15 @@ impl AccumulationQueue { pub struct PrimitiveFieldEncoder { accumulation_queue: AccumulationQueue, - encoder: Arc, + array_encoding_strategy: Arc, column_index: u32, } -fn get_compression_scheme() -> CompressionScheme { - let compression_scheme = std::env::var("LANCE_PAGE_COMPRESSION").unwrap_or("none".to_string()); - parse_compression_scheme(&compression_scheme).unwrap_or(CompressionScheme::None) -} - impl PrimitiveFieldEncoder { - pub fn array_encoder_from_data_type(data_type: &DataType) -> Result> { - match data_type { - DataType::FixedSizeList(inner, dimension) => { - Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new( - Self::array_encoder_from_data_type(inner.data_type())?, - *dimension as u32, - ))))) - } - _ => Ok(Box::new(BasicEncoder::new(Box::new( - ValueEncoder::try_new(data_type, get_compression_scheme())?, - )))), - } - } - pub fn try_new( cache_bytes: u64, keep_original_array: bool, - data_type: &DataType, + array_encoding_strategy: Arc, column_index: u32, ) -> Result { Ok(Self { @@ -558,33 +534,16 @@ impl PrimitiveFieldEncoder { keep_original_array, ), column_index, - encoder: Arc::from(Self::array_encoder_from_data_type(data_type)?), + array_encoding_strategy, }) } - pub fn new_with_encoder( - cache_bytes: u64, - keep_original_array: bool, - column_index: u32, - encoder: Arc, - ) -> Self { - Self { - accumulation_queue: AccumulationQueue::new( - cache_bytes, - column_index, - keep_original_array, - ), - column_index, - encoder, - } - } - // Creates an encode task, consuming all buffered data - fn do_flush(&mut self, arrays: Vec) -> EncodeTask { - let encoder = self.encoder.clone(); + fn do_flush(&mut self, arrays: Vec) -> Result { + let encoder = self.array_encoding_strategy.create_array_encoder(&arrays)?; let column_idx = self.column_index; - tokio::task::spawn(async move { + Ok(tokio::task::spawn(async move { let num_rows = arrays.iter().map(|arr| arr.len() as u32).sum(); let mut buffer_index = 0; let array = encoder.encode(&arrays, &mut buffer_index)?; @@ -595,7 +554,7 @@ impl PrimitiveFieldEncoder { }) }) .map(|res_res| res_res.unwrap()) - .boxed() + .boxed()) } } @@ -603,7 +562,7 @@ impl FieldEncoder for PrimitiveFieldEncoder { // Buffers data, if there is enough to write a page then we create an encode task fn maybe_encode(&mut self, array: ArrayRef) -> Result> { if let Some(arrays) = self.accumulation_queue.insert(array) { - Ok(vec![self.do_flush(arrays)]) + Ok(vec![self.do_flush(arrays)?]) } else { Ok(vec![]) } @@ -612,7 +571,7 @@ impl FieldEncoder for PrimitiveFieldEncoder { // If there is any data left in the buffer then create an encode task from it fn flush(&mut self) -> Result> { if let Some(arrays) = self.accumulation_queue.flush() { - Ok(vec![self.do_flush(arrays)]) + Ok(vec![self.do_flush(arrays)?]) } else { Ok(vec![]) } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index f051c68ab8..6a93428342 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -4,6 +4,7 @@ use crate::encodings::physical::value::CompressionScheme; use crate::{decoder::PhysicalPageScheduler, format::pb}; +use self::value::parse_compression_scheme; use self::{ basic::BasicPageScheduler, bitmap::DenseBitmapScheduler, fixed_size_list::FixedListScheduler, value::ValuePageScheduler, @@ -49,14 +50,6 @@ fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) { } } -pub fn parse_compression_scheme(scheme: &str) -> Result { - match scheme { - "none" => Ok(CompressionScheme::None), - "zstd" => Ok(CompressionScheme::Zstd), - _ => Err(format!("Unknown compression scheme: {}", scheme)), - } -} - /// Convert a protobuf buffer encoding into a physical page scheduler fn get_buffer_decoder( encoding: &pb::Flat, diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index 0b7d59cb90..b07b7e765a 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -41,6 +41,17 @@ impl fmt::Display for CompressionScheme { } } +pub fn parse_compression_scheme(scheme: &str) -> Result { + match scheme { + "none" => Ok(CompressionScheme::None), + "zstd" => Ok(CompressionScheme::Zstd), + _ => Err(Error::invalid_input( + format!("Unknown compression scheme: {}", scheme), + location!(), + )), + } +} + /// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk #[derive(Debug, Clone, Copy)] pub struct ValuePageScheduler { diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index c9f7003008..61162e73cb 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{ops::Range, sync::Arc}; +use std::{collections::HashMap, ops::Range, sync::Arc}; use arrow_array::{Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; @@ -16,7 +16,10 @@ use lance_datagen::{array, gen, RowCount, Seed}; use crate::{ decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMessage, PageInfo}, - encoder::{BatchEncoder, EncodedPage, FieldEncoder}, + encoder::{ + ColumnIndexSequence, CoreFieldEncodingStrategy, EncodedPage, FieldEncoder, + FieldEncodingStrategy, + }, encodings::logical::r#struct::SimpleStructDecoder, EncodingsIo, }; @@ -99,17 +102,20 @@ pub async fn check_round_trip_encoding_random(field: Field) { let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap(); for page_size in [4096, 1024 * 1024] { debug!("Testing random data with a page size of {}", page_size); + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let encoding_config = HashMap::new(); let encoder_factory = || { - let mut col_idx = 0; - let mut field_id_to_col_index = Vec::new(); - BatchEncoder::get_encoder_for_field( - &lance_field, - page_size, - /*keep_original_array=*/ true, - &mut col_idx, - &mut field_id_to_col_index, - ) - .unwrap() + let mut column_index_seq = ColumnIndexSequence::default(); + encoding_strategy + .create_field_encoder( + &encoding_strategy, + &lance_field, + &mut column_index_seq, + page_size, + true, + &encoding_config, + ) + .unwrap() }; check_round_trip_field_encoding_random(encoder_factory, field.clone()).await } @@ -174,16 +180,19 @@ pub async fn check_round_trip_encoding_of_data(data: Vec>, test_c let field = Field::new("", example_data.data_type().clone(), true); let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap(); for page_size in [4096, 1024 * 1024] { - let mut col_idx = 0; - let mut field_id_to_col_index = Vec::new(); - let encoder = BatchEncoder::get_encoder_for_field( - &lance_field, - page_size, - /*keep_original=*/ true, - &mut col_idx, - &mut field_id_to_col_index, - ) - .unwrap(); + let encoding_strategy = CoreFieldEncodingStrategy::default(); + let encoding_config = HashMap::new(); + let mut column_index_seq = ColumnIndexSequence::default(); + let encoder = encoding_strategy + .create_field_encoder( + &encoding_strategy, + &lance_field, + &mut column_index_seq, + page_size, + true, + &encoding_config, + ) + .unwrap(); check_round_trip_encoding_inner(encoder, &field, data.clone(), test_cases).await } } diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 76b8bc3f0c..64a5cadc1a 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -1,13 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::sync::Arc; + use arrow_array::RecordBatch; +use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::FuturesUnordered; use futures::StreamExt; use lance_core::datatypes::Schema as LanceSchema; use lance_core::{Error, Result}; -use lance_encoding::encoder::{BatchEncoder, EncodeTask, EncodedPage, FieldEncoder}; +use lance_encoding::encoder::{ + BatchEncoder, CoreFieldEncodingStrategy, EncodeTask, EncodedBatch, EncodedPage, FieldEncoder, + FieldEncodingStrategy, +}; use lance_io::object_writer::ObjectWriter; use lance_io::traits::Writer; use log::debug; @@ -59,6 +65,7 @@ pub struct FileWriterOptions { /// while) might keep a much larger record batch around in memory (even though most /// of that batch's data has been written to disk) pub keep_original_array: Option, + pub encoding_strategy: Option>, } pub struct FileWriter { @@ -100,8 +107,16 @@ impl FileWriter { schema.validate()?; let keep_original_array = options.keep_original_array.unwrap_or(false); - - let encoder = BatchEncoder::try_new(&schema, cache_bytes_per_column, keep_original_array)?; + let encoding_strategy = options + .encoding_strategy + .unwrap_or_else(|| Arc::new(CoreFieldEncodingStrategy::default())); + + let encoder = BatchEncoder::try_new( + &schema, + encoding_strategy.as_ref(), + cache_bytes_per_column, + keep_original_array, + )?; let num_columns = encoder.num_columns(); let column_writers = encoder.field_encoders; @@ -257,20 +272,22 @@ impl FileWriter { Ok(metadata_positions) } - fn make_file_descriptor(&self) -> Result { - let lance_schema = lance_core::datatypes::Schema::try_from(&self.schema)?; - let fields_with_meta = FieldsWithMeta::from(&lance_schema); + fn make_file_descriptor( + schema: &lance_core::datatypes::Schema, + num_rows: u64, + ) -> Result { + let fields_with_meta = FieldsWithMeta::from(schema); Ok(pb::FileDescriptor { schema: Some(pb::Schema { fields: fields_with_meta.fields.0, metadata: fields_with_meta.metadata, }), - length: self.rows_written, + length: num_rows, }) } async fn write_global_buffers(&mut self) -> Result> { - let file_descriptor = self.make_file_descriptor()?; + let file_descriptor = Self::make_file_descriptor(&self.schema, self.rows_written)?; let file_descriptor_bytes = file_descriptor.encode_to_vec(); let file_descriptor_len = file_descriptor_bytes.len() as u64; let file_descriptor_position = self.writer.tell().await? as u64; @@ -368,6 +385,122 @@ impl FileWriter { } } +/// Utility trait for converting EncodedBatch to Bytes using the +/// lance file format +pub trait EncodedBatchWriteExt { + /// Serializes into a lance file, including the schema + fn try_to_self_described_lance(&self) -> Result; + /// Serializes into a lance file, without the schema. + /// + /// The schema must be provided to deserialize the buffer + fn try_to_mini_lance(&self) -> Result; +} + +// Creates a lance footer and appends it to the encoded data +// +// The logic here is very similar to logic in the FileWriter except we +// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz). +fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result { + // Estimating 1MiB for file footer + let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024); + data.put(batch.data.clone()); + // Write column metadata buffers + let column_metadata_buffers_start = data.len() as u64; + // write global buffers (we write the schema here) + let global_buffers_start = data.len() as u64; + let global_buffers = if write_schema { + let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?; + let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?; + let descriptor_bytes = descriptor.encode_to_vec(); + let descriptor_len = descriptor_bytes.len() as u64; + data.put(descriptor_bytes.as_slice()); + + vec![(global_buffers_start, descriptor_len)] + } else { + vec![] + }; + let col_metadata_start = data.len() as u64; + + let mut col_metadata_positions = Vec::new(); + // Write column metadata + for col in &batch.page_table { + let position = data.len() as u64; + let pages = col + .page_infos + .iter() + .map(|page_info| { + let encoded_encoding = Any::from_msg(&page_info.encoding)?.encode_to_vec(); + let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info + .buffer_offsets_and_sizes + .as_ref() + .clone() + .into_iter() + .unzip(); + Ok(pbfile::column_metadata::Page { + buffer_offsets, + buffer_sizes, + encoding: Some(pbfile::Encoding { + location: Some(pbfile::encoding::Location::Direct(DirectEncoding { + encoding: encoded_encoding, + })), + }), + length: page_info.num_rows, + }) + }) + .collect::>>()?; + let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = + col.buffer_offsets_and_sizes.clone().into_iter().unzip(); + let column = pbfile::ColumnMetadata { + pages, + buffer_offsets, + buffer_sizes, + encoding: Some(pbfile::Encoding { + location: Some(pbfile::encoding::Location::None(())), + }), + }; + let column_bytes = column.encode_to_vec(); + col_metadata_positions.push((position, data.len() as u64)); + data.put(column_bytes.as_slice()); + } + // Write column metadata offsets table + let cmo_table_start = data.len() as u64; + for (meta_pos, meta_len) in col_metadata_positions { + data.put_u64_le(meta_pos); + data.put_u64_le(meta_len); + } + // Write global buffers offsets table + let gbo_table_start = data.len() as u64; + let num_global_buffers = global_buffers.len() as u32; + for (gbo_pos, gbo_len) in global_buffers { + data.put_u64_le(gbo_pos); + data.put_u64_le(gbo_len); + } + + // write the footer + data.put_u64_le(column_metadata_buffers_start); + data.put_u64_le(global_buffers_start); + data.put_u64_le(col_metadata_start); + data.put_u64_le(cmo_table_start); + data.put_u64_le(gbo_table_start); + data.put_u32_le(num_global_buffers); + data.put_u32_le(batch.page_table.len() as u32); + data.put_u16_le(MAJOR_VERSION as u16); + data.put_u16_le(MINOR_VERSION_NEXT); + data.put(MAGIC.as_slice()); + + Ok(data.freeze()) +} + +impl EncodedBatchWriteExt for EncodedBatch { + fn try_to_self_described_lance(&self) -> Result { + concat_lance_footer(self, true) + } + + fn try_to_mini_lance(&self) -> Result { + concat_lance_footer(self, false) + } +} + #[cfg(test)] mod tests { use std::sync::Arc;