From 2d6806247f0e48061d49c1201c37c6c7164e9338 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 31 May 2024 14:23:39 -0700 Subject: [PATCH] feat: stable row id manifest changes (#2363) * Adds stable row ids to manifest * Support writing row id sequences for append, overwrite, and update. Epic: #2307 --- docs/format.rst | 80 ++++++ protos/rowids.proto | 6 +- protos/table.proto | 29 +++ python/python/lance/fragment.py | 2 +- python/python/tests/test_fragment.py | 2 +- rust/lance-table/src/feature_flags.rs | 90 +++++++ rust/lance-table/src/format/fragment.rs | 54 +++- rust/lance-table/src/format/manifest.rs | 21 ++ rust/lance-table/src/lib.rs | 1 + rust/lance-table/src/rowids.rs | 3 +- rust/lance-table/src/rowids/bitmap.rs | 4 +- rust/lance-table/src/rowids/encoded_array.rs | 4 +- rust/lance-table/src/rowids/index.rs | 15 ++ rust/lance-table/src/rowids/segment.rs | 14 + rust/lance/src/dataset.rs | 18 +- rust/lance/src/dataset/feature_flags.rs | 52 ---- rust/lance/src/dataset/optimize.rs | 6 + rust/lance/src/dataset/rowids.rs | 253 +++++++++++++++++++ rust/lance/src/dataset/schema_evolution.rs | 1 + rust/lance/src/dataset/transaction.rs | 80 ++++-- rust/lance/src/dataset/write.rs | 7 + rust/lance/src/io/commit.rs | 4 + rust/lance/src/utils/test.rs | 1 + 23 files changed, 666 insertions(+), 81 deletions(-) create mode 100644 rust/lance-table/src/feature_flags.rs delete mode 100644 rust/lance/src/dataset/feature_flags.rs create mode 100644 rust/lance/src/dataset/rowids.rs diff --git a/docs/format.rst b/docs/format.rst index fa163c8609..d2494c36ad 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -403,3 +403,83 @@ compatible. However, writers should not write extra fields that aren't described in this document. Until they are defined in the specification, there is no guarantee that readers will be able to safely interpret new forms of statistics. + + +Feature: Move-Stable Row IDs +---------------------------- + +The row ids features assigns a unique u64 id to each row in the table. This id is +stable after being moved (such as during compaction), but is not necessarily +stable after a row is updated. (A future feature may make them stable after +updates.) To make access fast, a secondary index is created that maps row ids to +their locations in the table. The respective parts of these indices are stored +in the respective fragment's metadata. + +row id + A unique auto-incrementing u64 id assigned to each row in the table. + +row address + The current location of a row in the table. This is a u64 that can be thought + of as a pair of two u32 values: the fragment id and the local row offset. For + example, if the row address is (42, 9), then the row is in the 42rd fragment + and is the 10th row in that fragment. + +row id sequence + The sequence of row ids in a fragment. + +row id index + A secondary index that maps row ids to row addresses. This index is constructed + by reading all the row id sequences. + +Assigning row ids +~~~~~~~~~~~~~~~~~ + +Row ids are assigned in a monotonically increasing sequence. The next row id is +stored in the manifest as the field ``next_row_id``. This starts at zero. When +making a commit, the writer uses that field to assign row ids to new fragments. +If the commit fails, the writer will re-read the new ``next_row_id``, update +the new row ids, and then try again. This is similar to how the ``max_fragment_id`` +is used to assign new fragment ids. + +When a row id updated, it it typically assigned a new row id rather than +reusing the old one. This is because this feature doesn't have a mechanism to +update secondary indices that may reference the old values for the row id. By +deleting the old row id and creating a new one, the secondary indices will avoid +referencing stale data. + +Row ID sequences +~~~~~~~~~~~~~~~~ + +The row id values for a fragment are stored in a ``RowIdSequence`` protobuf +message. This is described in the `protos/rowids.proto`_ file. Row id sequences +are just arrays of u64 values, which have representations optimized for the +common case where they are sorted and possibly contiguous. For example, a new +fragment will have a row id sequence that is just a simple range, so it is +stored as a ``start`` and ``end`` value. + +These sequence messages are either stored inline in the fragment metadata, or +are written to a separate file and referenced from the fragment metadata. This +choice is typically made based on the size of the sequence. If the sequence is +small, it is stored inline. If it is large, it is written to a separate file. By +keeping the small sequences inline, we can avoid the overhead of additional IO +operations. + +.. literalinclude:: ../protos/table.proto + :language: protobuf + :start-at: oneof row_id_sequence { + :end-at: } // row_id_sequence + :caption: `protos/table.proto`_ + +Row ID index +~~~~~~~~~~~~ + +To ensure fast access to rows by their row id, a secondary index is created that +maps row ids to their locations in the table. This index is built when a table is +loaded, based on the row id sequences in the fragments. For example, if fragment +42 has a row id sequence of ``[0, 63, 10]``, then the index will have entries for +``0 -> (42, 0)``, ``63 -> (42, 1)``, ``10 -> (42, 2)``. The exact form of this +index is left up to the implementation, but it should be optimized for fast lookups. + +.. _protos/table.proto: https://github.com/lancedb/lance/blob/main/protos/table.proto +.. _protos/rowids.proto: https://github.com/lancedb/lance/blob/main/protos/rowids.proto + diff --git a/protos/rowids.proto b/protos/rowids.proto index 36f2996f6e..11b697ad29 100644 --- a/protos/rowids.proto +++ b/protos/rowids.proto @@ -21,7 +21,7 @@ message U64Segment { message Range { /// The start of the range, inclusive. uint64 start = 1; - /// The enc of the range, exclusive. + /// The end of the range, exclusive. uint64 end = 2; } @@ -43,7 +43,7 @@ message U64Segment { message RangeWithBitmap { /// The start of the range, inclusive. uint64 start = 1; - /// The enc of the range, exclusive. + /// The end of the range, exclusive. uint64 end = 2; /// A bitmap of the values in the range. The bitmap is a sequence of bytes, /// where each byte represents 8 values. The first byte represents values @@ -67,7 +67,7 @@ message U64Segment { /// A general array of values, which is not sorted. EncodedU64Array array = 5; } -} +} // RowIdSegment /// A basic bitpacked array of u64 values. message EncodedU64Array { diff --git a/protos/table.proto b/protos/table.proto index bfbafde54a..cdc5840d7c 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -83,6 +83,8 @@ message Manifest { // // Known flags: // * 1: deletion files are present + // * 2: move_stable_row_ids: row IDs are tracked and stable after move operations + // (such as compaction), but not updates. uint64 reader_feature_flags = 9; // Feature flags for writers. @@ -114,6 +116,11 @@ message Manifest { // version of the table the transaction read from, and {uuid} is a // hyphen-separated UUID. string transaction_file = 12; + + // The next unused row id. If zero, then the table does not have any rows. + // + // This is only used if the "move_stable_row_ids" feature flag is set. + uint64 next_row_id = 14; } // Manifest // Auxiliary Data attached to a version. @@ -167,6 +174,20 @@ message DataFragment { // File that indicates which rows, if any, should be considered deleted. DeletionFile deletion_file = 3; + // TODO: What's the simplest way we can allow an inline tombstone bitmap? + + // A serialized RowIdSequence message (see rowids.proto). + // + // These are the row ids for the fragment, in order of the rows as they appear. + // That is, if a fragment has 3 rows, and the row ids are [1, 42, 3], then the + // first row is row 1, the second row is row 42, and the third row is row 3. + oneof row_id_sequence { + // If small (< 200KB), the row ids are stored inline. + bytes inline_row_ids = 5; + // Otherwise, stored as part of a file. + ExternalFile external_row_ids = 6; + } // row_id_sequence + // Number of original rows in the fragment, this includes rows that are // now marked with deletion tombstones. To compute the current number of rows, // subtract `deletion_file.num_deleted_rows` from this value. @@ -273,3 +294,11 @@ message DeletionFile { uint64 num_deleted_rows = 4; } // DeletionFile +message ExternalFile { + // Path to the file, relative to the root of the table. + string path = 1; + // The offset in the file where the data starts. + uint64 offset = 2; + // The size of the data in the file. + uint64 size = 3; +} diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 72504ad769..631e5f3d75 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -453,7 +453,7 @@ def delete(self, predicate: str) -> FragmentMetadata | None: >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") - Fragment { id: 0, files: ..., deletion_file: Some(...), physical_rows: Some(3) } + Fragment { id: 0, files: ..., deletion_file: Some(...), ...} >>> frag.delete("a > 0") is None True diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index ec3636a058..59a0f44a54 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -246,7 +246,7 @@ def test_fragment_meta(): "column_indices: [], file_major_version: 0, file_minor_version: 0 }, " 'DataFile { path: "1.lance", fields: [1], column_indices: [], ' "file_major_version: 0, file_minor_version: 0 }], deletion_file: None, " - "physical_rows: Some(100) }" + "row_id_meta: None, physical_rows: Some(100) }" ) diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs new file mode 100644 index 0000000000..278b9e5335 --- /dev/null +++ b/rust/lance-table/src/feature_flags.rs @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Feature flags + +use snafu::{location, Location}; + +use crate::format::Manifest; +use lance_core::{Error, Result}; + +/// Fragments may contain deletion files, which record the tombstones of +/// soft-deleted rows. +pub const FLAG_DELETION_FILES: u64 = 1; +/// Row ids are table after moves, but not updates. Fragments contain an index +/// mapping row ids to row addresses. +pub const FLAG_MOVE_STABLE_ROW_IDS: u64 = 2; + +/// Set the reader and writer feature flags in the manifest based on the contents of the manifest. +pub fn apply_feature_flags(manifest: &mut Manifest) -> Result<()> { + // Reset flags + manifest.reader_feature_flags = 0; + manifest.writer_feature_flags = 0; + + let has_deletion_files = manifest + .fragments + .iter() + .any(|frag| frag.deletion_file.is_some()); + if has_deletion_files { + // Both readers and writers need to be able to read deletion files + manifest.reader_feature_flags |= FLAG_DELETION_FILES; + manifest.writer_feature_flags |= FLAG_DELETION_FILES; + } + + // If any fragment has row ids, they must all have row ids. + let has_row_ids = manifest + .fragments + .iter() + .any(|frag| frag.row_id_meta.is_some()); + if has_row_ids { + if !manifest + .fragments + .iter() + .all(|frag| frag.row_id_meta.is_some()) + { + return Err(Error::invalid_input( + "All fragments must have row ids", + location!(), + )); + } + manifest.reader_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS; + manifest.writer_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS; + } + + Ok(()) +} + +pub fn can_read_dataset(reader_flags: u64) -> bool { + reader_flags <= 3 +} + +pub fn can_write_dataset(writer_flags: u64) -> bool { + writer_flags <= 3 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_check() { + assert!(can_read_dataset(0)); + assert!(can_read_dataset(super::FLAG_DELETION_FILES)); + assert!(can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS)); + assert!(can_read_dataset( + super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS + )); + assert!(!can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS << 1)); + } + + #[test] + fn test_write_check() { + assert!(can_write_dataset(0)); + assert!(can_write_dataset(super::FLAG_DELETION_FILES)); + assert!(can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS)); + assert!(can_write_dataset( + super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS + )); + assert!(!can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS << 1)); + } +} diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 3ab6d43488..942d6ed2cd 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -170,6 +170,38 @@ impl TryFrom for DeletionFile { } } +/// A reference to a part of a file. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExternalFile { + pub path: String, + pub offset: u64, + pub size: u64, +} + +/// Metadata about location of the row id sequence. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RowIdMeta { + Inline(Vec), + External(ExternalFile), +} + +impl TryFrom for RowIdMeta { + type Error = Error; + + fn try_from(value: pb::data_fragment::RowIdSequence) -> Result { + match value { + pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)), + pb::data_fragment::RowIdSequence::ExternalRowIds(file) => { + Ok(Self::External(ExternalFile { + path: file.path.clone(), + offset: file.offset, + size: file.size, + })) + } + } + } +} + /// Data fragment. /// /// A fragment is a set of files which represent the different columns of the same rows. @@ -186,6 +218,10 @@ pub struct Fragment { #[serde(skip_serializing_if = "Option::is_none")] pub deletion_file: Option, + /// RowIndex + #[serde(skip_serializing_if = "Option::is_none")] + pub row_id_meta: Option, + /// Original number of rows in the fragment. If this is None, then it is /// unknown. This is only optional for legacy reasons. All new tables should /// have this set. @@ -198,6 +234,7 @@ impl Fragment { id, files: vec![], deletion_file: None, + row_id_meta: None, physical_rows: None, } } @@ -235,6 +272,7 @@ impl Fragment { files: vec![DataFile::new_legacy(path, schema)], deletion_file: None, physical_rows, + row_id_meta: None, } } @@ -280,8 +318,9 @@ impl TryFrom for Fragment { .files .into_iter() .map(DataFile::try_from) - .collect::>>()?, + .collect::>()?, deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?, + row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?, physical_rows, }) } @@ -301,10 +340,23 @@ impl From<&Fragment> for pb::DataFragment { num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64, } }); + + let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m { + RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()), + RowIdMeta::External(file) => { + pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile { + path: file.path.clone(), + offset: file.offset, + size: file.size, + }) + } + }); + Self { id: f.id, files: f.files.iter().map(pb::DataFile::from).collect(), deletion_file, + row_id_sequence, physical_rows: f.physical_rows.unwrap_or_default() as u64, } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index d6f9aa57f8..89ac0350c9 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -13,6 +13,7 @@ use object_store::path::Path; use prost_types::Timestamp; use super::Fragment; +use crate::feature_flags::FLAG_MOVE_STABLE_ROW_IDS; use crate::format::pb; use lance_core::cache::FileMetadataCache; use lance_core::datatypes::Schema; @@ -68,6 +69,9 @@ pub struct Manifest { /// Precomputed logic offset of each fragment /// accelerating the fragment search using offset ranges. fragment_offsets: Vec, + + /// The max row id used so far. + pub next_row_id: u64, } fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec { @@ -100,6 +104,7 @@ impl Manifest { max_fragment_id: 0, transaction_file: None, fragment_offsets, + next_row_id: 0, } } @@ -124,6 +129,7 @@ impl Manifest { max_fragment_id: previous.max_fragment_id, transaction_file: None, fragment_offsets, + next_row_id: previous.next_row_id, } } @@ -330,6 +336,7 @@ impl ProtoStruct for Manifest { impl TryFrom for Manifest { type Error = Error; + fn try_from(p: pb::Manifest) -> Result { let timestamp_nanos = p.timestamp.map(|ts| { let sec = ts.seconds as u128 * 1e9 as u128; @@ -354,6 +361,16 @@ impl TryFrom for Manifest { fields: Fields(p.fields), metadata: p.metadata, }; + + if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0 + && !fragments.iter().all(|frag| frag.row_id_meta.is_some()) + { + return Err(Error::Internal { + message: "All fragments must have row ids".into(), + location: location!(), + }); + } + Ok(Self { schema: Schema::from(fields_with_meta), version: p.version, @@ -372,6 +389,7 @@ impl TryFrom for Manifest { Some(p.transaction_file) }, fragment_offsets, + next_row_id: p.next_row_id, }) } } @@ -409,6 +427,7 @@ impl From<&Manifest> for pb::Manifest { writer_feature_flags: m.writer_feature_flags, max_fragment_id: m.max_fragment_id, transaction_file: m.transaction_file.clone().unwrap_or_default(), + next_row_id: m.next_row_id, } } } @@ -569,6 +588,7 @@ mod tests { id: 0, files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])], deletion_file: None, + row_id_meta: None, physical_rows: None, }, Fragment { @@ -578,6 +598,7 @@ mod tests { DataFile::new_legacy_from_fields("path3", vec![2]), ], deletion_file: None, + row_id_meta: None, physical_rows: None, }, ]; diff --git a/rust/lance-table/src/lib.rs b/rust/lance-table/src/lib.rs index b429f3c4e4..ebe892ba53 100644 --- a/rust/lance-table/src/lib.rs +++ b/rust/lance-table/src/lib.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +pub mod feature_flags; pub mod format; pub mod io; pub mod rowids; diff --git a/rust/lance-table/src/rowids.rs b/rust/lance-table/src/rowids.rs index 0f360c3a76..64ceeb2ee8 100644 --- a/rust/lance-table/src/rowids.rs +++ b/rust/lance-table/src/rowids.rs @@ -22,6 +22,7 @@ mod index; mod segment; mod serde; +use deepsize::DeepSizeOf; // These are the public API. pub use index::RowIdIndex; pub use serde::{read_row_ids, write_row_ids}; @@ -40,7 +41,7 @@ use segment::U64Segment; /// contiguous or sorted. /// /// We can make optimizations that assume uniqueness. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, DeepSizeOf)] pub struct RowIdSequence(Vec); impl std::fmt::Display for RowIdSequence { diff --git a/rust/lance-table/src/rowids/bitmap.rs b/rust/lance-table/src/rowids/bitmap.rs index 8c92d2da11..dc628ddcf8 100644 --- a/rust/lance-table/src/rowids/bitmap.rs +++ b/rust/lance-table/src/rowids/bitmap.rs @@ -1,7 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -#[derive(PartialEq, Eq, Clone)] +use deepsize::DeepSizeOf; + +#[derive(PartialEq, Eq, Clone, DeepSizeOf)] pub struct Bitmap { pub data: Vec, pub len: usize, diff --git a/rust/lance-table/src/rowids/encoded_array.rs b/rust/lance-table/src/rowids/encoded_array.rs index 879b4eddf6..8ecbfe852f 100644 --- a/rust/lance-table/src/rowids/encoded_array.rs +++ b/rust/lance-table/src/rowids/encoded_array.rs @@ -3,10 +3,12 @@ use std::ops::Range; +use deepsize::DeepSizeOf; + /// Encoded array of u64 values. /// /// This is a internal data type used as part of row id indices. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] pub enum EncodedU64Array { /// u64 values represented as u16 offset from a base value. /// diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 6159a0abb8..379e19e37a 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -3,6 +3,7 @@ use std::ops::RangeInclusive; +use deepsize::DeepSizeOf; use lance_core::utils::address::RowAddress; use lance_core::{Error, Result}; use rangemap::RangeInclusiveMap; @@ -57,6 +58,20 @@ impl RowIdIndex { } } +impl DeepSizeOf for RowIdIndex { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + self.0 + .iter() + .map(|(_, (row_id_segment, address_segment))| { + (2 * std::mem::size_of::()) + + std::mem::size_of::<(U64Segment, U64Segment)>() + + row_id_segment.deep_size_of_children(context) + + address_segment.deep_size_of_children(context) + }) + .sum() + } +} + fn decompose_sequence( fragment_id: u32, sequence: &RowIdSequence, diff --git a/rust/lance-table/src/rowids/segment.rs b/rust/lance-table/src/rowids/segment.rs index c9ffd6b1dc..84e6498cda 100644 --- a/rust/lance-table/src/rowids/segment.rs +++ b/rust/lance-table/src/rowids/segment.rs @@ -3,6 +3,8 @@ use std::ops::{Range, RangeInclusive}; +use deepsize::DeepSizeOf; + use super::{bitmap::Bitmap, encoded_array::EncodedU64Array}; /// Different ways to represent a sequence of distinct u64s. @@ -61,6 +63,18 @@ pub enum U64Segment { Array(EncodedU64Array), } +impl DeepSizeOf for U64Segment { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + match self { + Self::Range(_) => 0, + Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context), + Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context), + Self::SortedArray(array) => array.deep_size_of_children(context), + Self::Array(array) => array.deep_size_of_children(context), + } + } +} + /// Statistics about a segment of u64s. #[derive(Debug)] struct SegmentStats { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 68a51817c2..cb8d1e8e17 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -35,12 +35,12 @@ use tracing::instrument; pub mod builder; pub mod cleanup; -mod feature_flags; pub mod fragment; mod hash_joiner; pub mod index; pub mod optimize; pub mod progress; +mod rowids; pub mod scanner; mod schema_evolution; mod take; @@ -51,7 +51,6 @@ mod write; use self::builder::DatasetBuilder; use self::cleanup::RemovalStats; -use self::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset}; use self::fragment::FileFragment; use self::scanner::{DatasetRecordBatchStream, Scanner}; use self::transaction::{Operation, Transaction}; @@ -64,6 +63,7 @@ use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime}; use crate::{Error, Result}; use hash_joiner::HashJoiner; pub use lance_core::ROW_ID; +use lance_table::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset}; pub use schema_evolution::{ BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, }; @@ -479,13 +479,17 @@ impl Dataset { None, ); + let manifest_config = ManifestWriteConfig { + use_move_stable_row_ids: params.enable_move_stable_row_ids, + ..Default::default() + }; let manifest = if let Some(dataset) = &dataset { commit_transaction( dataset, &object_store, commit_handler.as_ref(), &transaction, - &Default::default(), + &manifest_config, &Default::default(), ) .await? @@ -495,7 +499,7 @@ impl Dataset { commit_handler.as_ref(), &base, &transaction, - &Default::default(), + &manifest_config, ) .await? }; @@ -1218,6 +1222,7 @@ impl Dataset { pub(crate) struct ManifestWriteConfig { auto_set_feature_flags: bool, // default true timestamp: Option, // default None + use_move_stable_row_ids: bool, // default false } impl Default for ManifestWriteConfig { @@ -1225,6 +1230,7 @@ impl Default for ManifestWriteConfig { Self { auto_set_feature_flags: true, timestamp: None, + use_move_stable_row_ids: false, } } } @@ -1239,7 +1245,7 @@ pub(crate) async fn write_manifest_file( config: &ManifestWriteConfig, ) -> std::result::Result<(), CommitError> { if config.auto_set_feature_flags { - apply_feature_flags(manifest); + apply_feature_flags(manifest)?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); @@ -1303,6 +1309,7 @@ mod tests { use lance_datagen::{array, gen, BatchCount, RowCount}; use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; + use lance_table::feature_flags; use lance_table::format::WriterVersion; use lance_table::io::commit::RenameCommitHandler; use lance_table::io::deletion::read_deletion_file; @@ -1705,6 +1712,7 @@ mod tests { &ManifestWriteConfig { auto_set_feature_flags: false, timestamp: None, + use_move_stable_row_ids: false, }, ) .await diff --git a/rust/lance/src/dataset/feature_flags.rs b/rust/lance/src/dataset/feature_flags.rs deleted file mode 100644 index d32147bd4e..0000000000 --- a/rust/lance/src/dataset/feature_flags.rs +++ /dev/null @@ -1,52 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -// Feature flags - -use lance_table::format::Manifest; - -pub const FLAG_DELETION_FILES: u64 = 1; - -/// Set the reader and writer feature flags in the manifest based on the contents of the manifest. -pub fn apply_feature_flags(manifest: &mut Manifest) { - // Reset flags - manifest.reader_feature_flags = 0; - manifest.writer_feature_flags = 0; - - let has_deletion_files = manifest - .fragments - .iter() - .any(|frag| frag.deletion_file.is_some()); - if has_deletion_files { - // Both readers and writers need to be able to read deletion files - manifest.reader_feature_flags |= FLAG_DELETION_FILES; - manifest.writer_feature_flags |= FLAG_DELETION_FILES; - } -} - -pub fn can_read_dataset(reader_flags: u64) -> bool { - reader_flags <= 1 -} - -pub fn can_write_dataset(writer_flags: u64) -> bool { - writer_flags <= 1 -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_read_check() { - assert!(can_read_dataset(0)); - assert!(can_read_dataset(super::FLAG_DELETION_FILES)); - assert!(!can_read_dataset(super::FLAG_DELETION_FILES + 1)); - } - - #[test] - fn test_write_check() { - assert!(can_write_dataset(0)); - assert!(can_write_dataset(super::FLAG_DELETION_FILES)); - assert!(!can_write_dataset(super::FLAG_DELETION_FILES + 1)); - } -} diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 955564fba4..ed09a7999b 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -929,12 +929,14 @@ mod tests { id: 0, files: Vec::new(), deletion_file: None, + row_id_meta: None, physical_rows: Some(5), }, Fragment { id: 3, files: Vec::new(), deletion_file: None, + row_id_meta: None, physical_rows: Some(3), }, ]; @@ -1060,6 +1062,7 @@ mod tests { id: 0, files: vec![], deletion_file: None, + row_id_meta: None, physical_rows: Some(0), }; let single_bin = CandidateBin { @@ -1566,18 +1569,21 @@ mod tests { id: 0, files: Vec::new(), deletion_file: None, + row_id_meta: None, physical_rows: Some(5), }, Fragment { id: 3, files: Vec::new(), deletion_file: None, + row_id_meta: None, physical_rows: Some(3), }, Fragment { id: 1, files: Vec::new(), deletion_file: None, + row_id_meta: None, physical_rows: Some(3), }, ]; diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs new file mode 100644 index 0000000000..48c0f37f40 --- /dev/null +++ b/rust/lance/src/dataset/rowids.rs @@ -0,0 +1,253 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use super::Dataset; +use crate::{Error, Result}; +use futures::{StreamExt, TryStreamExt}; +use snafu::{location, Location}; +use std::sync::Arc; + +use lance_table::{ + format::RowIdMeta, + rowids::{read_row_ids, RowIdIndex}, +}; + +// TODO: remove allow unused once we start using this in query and take paths. +#[allow(unused)] +pub async fn get_row_id_index(dataset: &Dataset) -> Result> { + // The path here isn't real, it's just used to prevent collisions in the cache. + let path = dataset + .base + .child("row_ids") + .child(dataset.manifest.version.to_string()); + dataset + .session + .file_metadata_cache + .get_or_insert(&path, |_path| async { load_row_id_index(dataset).await }) + .await +} + +async fn load_row_id_index(dataset: &Dataset) -> Result { + let mut num_external = 0; + for fragment in dataset.manifest.fragments.iter() { + match fragment.row_id_meta { + Some(RowIdMeta::External(_)) => num_external += 1, + None => { + return Err(Error::Internal { + message: "Missing row id meta".into(), + location: location!(), + }) + } + _ => {} + } + } + + let mut external_files = Vec::with_capacity(num_external); + let mut inline_files = Vec::with_capacity(dataset.manifest.fragments.len() - num_external); + for fragment in dataset.manifest.fragments.iter() { + match &fragment.row_id_meta { + Some(RowIdMeta::External(file_slice)) => { + external_files.push((fragment.id as u32, file_slice)) + } + Some(RowIdMeta::Inline(row_ids)) => inline_files.push((fragment.id as u32, row_ids)), + _ => {} + } + } + + let mut sequences = Vec::with_capacity(dataset.manifest.fragments.len()); + futures::stream::iter(external_files) + .map(|(id, file_slice)| async move { + let path = dataset.base.child(file_slice.path.as_str()); + let range = + file_slice.offset as usize..(file_slice.offset as usize + file_slice.size as usize); + let data = dataset + .object_store + .open(&path) + .await? + .get_range(range) + .await?; + let sequence = read_row_ids(&data)?; + Ok::<_, Error>((id, sequence)) + }) + .buffer_unordered(num_cpus::get()) + .try_for_each(|(id, sequence)| { + sequences.push((id, sequence)); + futures::future::ready(Ok(())) + }) + .await?; + + for (id, row_ids) in inline_files { + let sequence = read_row_ids(row_ids)?; + sequences.push((id, sequence)); + } + + let index = RowIdIndex::new(&sequences)?; + + Ok(index) +} + +#[cfg(test)] +mod test { + use crate::dataset::{UpdateBuilder, WriteMode, WriteParams}; + + use super::*; + + use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_core::utils::address::RowAddress; + + #[tokio::test] + async fn test_empty_dataset_rowids() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()); + let write_params = WriteParams { + enable_move_stable_row_ids: true, + ..Default::default() + }; + let dataset = Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap(); + + let index = get_row_id_index(&dataset).await.unwrap(); + assert!(index.get(0).is_none()); + + assert_eq!(dataset.manifest().next_row_id, 0); + } + + #[tokio::test] + async fn test_new_row_ids() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let num_rows = 25u64; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + enable_move_stable_row_ids: true, + max_rows_per_file: 10, + ..Default::default() + }; + let dataset = Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap(); + + let index = get_row_id_index(&dataset).await.unwrap(); + + let found_addresses = (0..num_rows) + .map(|i| index.get(i).unwrap()) + .collect::>(); + let expected_addresses = (0..num_rows) + .map(|i| { + let fragment_id = i / 10; + RowAddress::new_from_parts(fragment_id as u32, (i % 10) as u32) + }) + .collect::>(); + assert_eq!(found_addresses, expected_addresses); + + assert_eq!(dataset.manifest().next_row_id, num_rows); + } + + #[tokio::test] + async fn test_row_ids_overwrite() { + // Validate we don't re-use after overwriting + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let num_rows = 10u64; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()); + let write_params = WriteParams { + enable_move_stable_row_ids: true, + ..Default::default() + }; + let temp_dir = tempfile::tempdir().unwrap(); + let tmp_path = temp_dir.path().to_str().unwrap(); + let dataset = Dataset::write(reader, tmp_path, Some(write_params)) + .await + .unwrap(); + + assert_eq!(dataset.manifest().next_row_id, num_rows); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + mode: WriteMode::Overwrite, + ..Default::default() + }; + let dataset = Dataset::write(reader, tmp_path, Some(write_params)) + .await + .unwrap(); + + // Overwriting should NOT reset the row id counter. + assert_eq!(dataset.manifest().next_row_id, 2 * num_rows); + + let index = get_row_id_index(&dataset).await.unwrap(); + assert!(index.get(0).is_none()); + assert!(index.get(num_rows).is_some()); + } + + #[tokio::test] + async fn test_row_ids_update() { + // Updated fragments get fresh row ids. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let num_rows = 5u64; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()); + let write_params = WriteParams { + enable_move_stable_row_ids: true, + ..Default::default() + }; + let dataset = Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap(); + + assert_eq!(dataset.manifest().next_row_id, num_rows); + + let dataset = UpdateBuilder::new(Arc::new(dataset)) + .update_where("id = 3") + .unwrap() + .set("id", "100") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + + let index = get_row_id_index(&dataset).await.unwrap(); + assert!(index.get(0).is_some()); + // Old address is still there. + assert_eq!(index.get(3), Some(RowAddress::new_from_parts(0, 3))); + // New location is there. + assert_eq!(index.get(5), Some(RowAddress::new_from_parts(1, 0))); + } + + // TODO: compaction does the right thing + + // TODO: test scan with row id produces correct values. +} diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 09041e1b63..3ea9a536d8 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -797,6 +797,7 @@ mod test { files: vec![], id: 0, deletion_file: None, + row_id_meta: None, physical_rows: Some(50), })) } else { diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f21891d1dd..0f9271a0ac 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -43,20 +43,22 @@ use lance_io::object_store::ObjectStore; use lance_table::{ format::{ pb::{self, IndexMetadata}, - Fragment, Index, Manifest, + Fragment, Index, Manifest, RowIdMeta, }, io::{ commit::CommitHandler, manifest::{read_manifest, read_manifest_indexes}, }, + rowids::{write_row_ids, RowIdSequence}, }; use object_store::path::Path; use roaring::RoaringBitmap; use snafu::{location, Location}; use uuid::Uuid; -use super::{feature_flags::apply_feature_flags, ManifestWriteConfig}; +use super::ManifestWriteConfig; use crate::utils::temporal::timestamp_to_nanos; +use lance_table::feature_flags::{apply_feature_flags, FLAG_MOVE_STABLE_ROW_IDS}; /// A change to a dataset that can be retried /// @@ -382,6 +384,25 @@ impl Transaction { let mut final_fragments = Vec::new(); let mut final_indices = current_indices; + let mut next_row_id = { + // Only use row ids if the feature flag is set already or + match (current_manifest, config.use_move_stable_row_ids) { + (Some(manifest), _) + if manifest.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0 => + { + Some(manifest.next_row_id) + } + (None, true) => Some(0), + (_, false) => None, + (Some(_), true) => { + return Err(Error::NotSupported { + source: "Cannot enable stable row ids on existing dataset".into(), + location: location!(), + }); + } + } + }; + let maybe_existing_fragments = current_manifest .map(|m| m.fragments.as_ref()) @@ -396,10 +417,13 @@ impl Transaction { match &self.operation { Operation::Append { ref fragments } => { final_fragments.extend(maybe_existing_fragments?.clone()); - final_fragments.extend(Self::fragments_with_ids( - fragments.clone(), - &mut fragment_id, - )); + let mut new_fragments = + Self::fragments_with_ids(fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); } Operation::Delete { ref updated_fragments, @@ -432,16 +456,22 @@ impl Transaction { Some(f.clone()) } })); - final_fragments.extend(Self::fragments_with_ids( - new_fragments.clone(), - &mut fragment_id, - )); + let mut new_fragments = + Self::fragments_with_ids(new_fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); } Operation::Overwrite { ref fragments, .. } => { - final_fragments.extend(Self::fragments_with_ids( - fragments.clone(), - &mut fragment_id, - )); + let mut new_fragments = + Self::fragments_with_ids(fragments.clone(), &mut fragment_id) + .collect::>(); + if let Some(next_row_id) = &mut next_row_id { + Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; + } + final_fragments.extend(new_fragments); final_indices = Vec::new(); } Operation::Rewrite { @@ -521,7 +551,7 @@ impl Transaction { manifest.tag.clone_from(&self.tag); if config.auto_set_feature_flags { - apply_feature_flags(&mut manifest); + apply_feature_flags(&mut manifest)?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); @@ -533,6 +563,10 @@ impl Transaction { manifest.transaction_file = Some(transaction_file_path.to_string()); + if let Some(next_row_id) = next_row_id { + manifest.next_row_id = next_row_id; + } + Ok((manifest, final_indices)) } @@ -663,6 +697,22 @@ impl Transaction { } Ok(()) } + + fn assign_row_ids(next_row_id: &mut u64, fragments: &mut [Fragment]) -> Result<()> { + for fragment in fragments { + let physical_rows = fragment.physical_rows.ok_or_else(|| Error::Internal { + message: "Fragment does not have physical rows".into(), + location: location!(), + })? as u64; + let row_ids = *next_row_id..(*next_row_id + physical_rows); + let sequence = RowIdSequence::from(row_ids); + // TODO: write to a separate file if large. Possibly share a file with other fragments. + let serialized = write_row_ids(&sequence); + fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); + *next_row_id += physical_rows; + } + Ok(()) + } } impl TryFrom for Transaction { diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 2d7a515fbe..ddb3371af2 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -101,6 +101,12 @@ pub struct WriteParams { /// Unless you are intentionally testing the v2 writer, you should leave this as false /// as the v2 writer is still experimental and not fully implemented. pub use_experimental_writer: bool, + + /// Experimental: if set to true, the writer will use move-stable row ids. + /// These row ids are stable after compaction operations, but not after updates. + /// This makes compaction more efficient, since with stable row ids no + /// secondary indices need to be updated to point to new row ids. + pub enable_move_stable_row_ids: bool, } impl Default for WriteParams { @@ -116,6 +122,7 @@ impl Default for WriteParams { progress: Arc::new(NoopFragmentWriteProgress::new()), commit_handler: None, use_experimental_writer: false, + enable_move_stable_row_ids: false, } } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c02069c34b..de7355b06a 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -856,6 +856,7 @@ mod tests { DataFile::new_legacy_from_fields("unused", vec![9]), ], deletion_file: None, + row_id_meta: None, physical_rows: None, }, Fragment { @@ -865,6 +866,7 @@ mod tests { DataFile::new_legacy_from_fields("path3", vec![2]), ], deletion_file: None, + row_id_meta: None, physical_rows: None, }, ]; @@ -890,6 +892,7 @@ mod tests { id: 0, files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 10])], deletion_file: None, + row_id_meta: None, physical_rows: None, }, Fragment { @@ -899,6 +902,7 @@ mod tests { DataFile::new_legacy_from_fields("path3", vec![10]), ], deletion_file: None, + row_id_meta: None, physical_rows: None, }, ]; diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 4de5f71bbb..e60c45e11f 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -231,6 +231,7 @@ impl TestDatasetGenerator { id: 0, files, deletion_file: None, + row_id_meta: None, physical_rows: Some(batch.num_rows()), } }