Skip to content

Commit

Permalink
feat: stable row id manifest changes (#2363)
Browse files Browse the repository at this point in the history
* Adds stable row ids to manifest
* Support writing row id sequences for append, overwrite, and update.

Epic: #2307
  • Loading branch information
wjones127 authored May 31, 2024
1 parent fbf3800 commit 2d68062
Show file tree
Hide file tree
Showing 23 changed files with 666 additions and 81 deletions.
80 changes: 80 additions & 0 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,83 @@ compatible.
However, writers should not write extra fields that aren't described in this
document. Until they are defined in the specification, there is no guarantee that
readers will be able to safely interpret new forms of statistics.


Feature: Move-Stable Row IDs
----------------------------

The row ids features assigns a unique u64 id to each row in the table. This id is
stable after being moved (such as during compaction), but is not necessarily
stable after a row is updated. (A future feature may make them stable after
updates.) To make access fast, a secondary index is created that maps row ids to
their locations in the table. The respective parts of these indices are stored
in the respective fragment's metadata.

row id
A unique auto-incrementing u64 id assigned to each row in the table.

row address
The current location of a row in the table. This is a u64 that can be thought
of as a pair of two u32 values: the fragment id and the local row offset. For
example, if the row address is (42, 9), then the row is in the 42rd fragment
and is the 10th row in that fragment.

row id sequence
The sequence of row ids in a fragment.

row id index
A secondary index that maps row ids to row addresses. This index is constructed
by reading all the row id sequences.

Assigning row ids
~~~~~~~~~~~~~~~~~

Row ids are assigned in a monotonically increasing sequence. The next row id is
stored in the manifest as the field ``next_row_id``. This starts at zero. When
making a commit, the writer uses that field to assign row ids to new fragments.
If the commit fails, the writer will re-read the new ``next_row_id``, update
the new row ids, and then try again. This is similar to how the ``max_fragment_id``
is used to assign new fragment ids.

When a row id updated, it it typically assigned a new row id rather than
reusing the old one. This is because this feature doesn't have a mechanism to
update secondary indices that may reference the old values for the row id. By
deleting the old row id and creating a new one, the secondary indices will avoid
referencing stale data.

Row ID sequences
~~~~~~~~~~~~~~~~

The row id values for a fragment are stored in a ``RowIdSequence`` protobuf
message. This is described in the `protos/rowids.proto`_ file. Row id sequences
are just arrays of u64 values, which have representations optimized for the
common case where they are sorted and possibly contiguous. For example, a new
fragment will have a row id sequence that is just a simple range, so it is
stored as a ``start`` and ``end`` value.

These sequence messages are either stored inline in the fragment metadata, or
are written to a separate file and referenced from the fragment metadata. This
choice is typically made based on the size of the sequence. If the sequence is
small, it is stored inline. If it is large, it is written to a separate file. By
keeping the small sequences inline, we can avoid the overhead of additional IO
operations.

.. literalinclude:: ../protos/table.proto
:language: protobuf
:start-at: oneof row_id_sequence {
:end-at: } // row_id_sequence
:caption: `protos/table.proto`_

Row ID index
~~~~~~~~~~~~

To ensure fast access to rows by their row id, a secondary index is created that
maps row ids to their locations in the table. This index is built when a table is
loaded, based on the row id sequences in the fragments. For example, if fragment
42 has a row id sequence of ``[0, 63, 10]``, then the index will have entries for
``0 -> (42, 0)``, ``63 -> (42, 1)``, ``10 -> (42, 2)``. The exact form of this
index is left up to the implementation, but it should be optimized for fast lookups.

.. _protos/table.proto: https://github.com/lancedb/lance/blob/main/protos/table.proto
.. _protos/rowids.proto: https://github.com/lancedb/lance/blob/main/protos/rowids.proto

6 changes: 3 additions & 3 deletions protos/rowids.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ message U64Segment {
message Range {
/// The start of the range, inclusive.
uint64 start = 1;
/// The enc of the range, exclusive.
/// The end of the range, exclusive.
uint64 end = 2;
}

Expand All @@ -43,7 +43,7 @@ message U64Segment {
message RangeWithBitmap {
/// The start of the range, inclusive.
uint64 start = 1;
/// The enc of the range, exclusive.
/// The end of the range, exclusive.
uint64 end = 2;
/// A bitmap of the values in the range. The bitmap is a sequence of bytes,
/// where each byte represents 8 values. The first byte represents values
Expand All @@ -67,7 +67,7 @@ message U64Segment {
/// A general array of values, which is not sorted.
EncodedU64Array array = 5;
}
}
} // RowIdSegment

/// A basic bitpacked array of u64 values.
message EncodedU64Array {
Expand Down
29 changes: 29 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ message Manifest {
//
// Known flags:
// * 1: deletion files are present
// * 2: move_stable_row_ids: row IDs are tracked and stable after move operations
// (such as compaction), but not updates.
uint64 reader_feature_flags = 9;

// Feature flags for writers.
Expand Down Expand Up @@ -114,6 +116,11 @@ message Manifest {
// version of the table the transaction read from, and {uuid} is a
// hyphen-separated UUID.
string transaction_file = 12;

// The next unused row id. If zero, then the table does not have any rows.
//
// This is only used if the "move_stable_row_ids" feature flag is set.
uint64 next_row_id = 14;
} // Manifest

// Auxiliary Data attached to a version.
Expand Down Expand Up @@ -167,6 +174,20 @@ message DataFragment {
// File that indicates which rows, if any, should be considered deleted.
DeletionFile deletion_file = 3;

// TODO: What's the simplest way we can allow an inline tombstone bitmap?

// A serialized RowIdSequence message (see rowids.proto).
//
// These are the row ids for the fragment, in order of the rows as they appear.
// That is, if a fragment has 3 rows, and the row ids are [1, 42, 3], then the
// first row is row 1, the second row is row 42, and the third row is row 3.
oneof row_id_sequence {
// If small (< 200KB), the row ids are stored inline.
bytes inline_row_ids = 5;
// Otherwise, stored as part of a file.
ExternalFile external_row_ids = 6;
} // row_id_sequence

// Number of original rows in the fragment, this includes rows that are
// now marked with deletion tombstones. To compute the current number of rows,
// subtract `deletion_file.num_deleted_rows` from this value.
Expand Down Expand Up @@ -273,3 +294,11 @@ message DeletionFile {
uint64 num_deleted_rows = 4;
} // DeletionFile

message ExternalFile {
// Path to the file, relative to the root of the table.
string path = 1;
// The offset in the file where the data starts.
uint64 offset = 2;
// The size of the data in the file.
uint64 size = 3;
}
2 changes: 1 addition & 1 deletion python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def delete(self, predicate: str) -> FragmentMetadata | None:
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
Fragment { id: 0, files: ..., deletion_file: Some(...), physical_rows: Some(3) }
Fragment { id: 0, files: ..., deletion_file: Some(...), ...}
>>> frag.delete("a > 0") is None
True
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_fragment_meta():
"column_indices: [], file_major_version: 0, file_minor_version: 0 }, "
'DataFile { path: "1.lance", fields: [1], column_indices: [], '
"file_major_version: 0, file_minor_version: 0 }], deletion_file: None, "
"physical_rows: Some(100) }"
"row_id_meta: None, physical_rows: Some(100) }"
)


Expand Down
90 changes: 90 additions & 0 deletions rust/lance-table/src/feature_flags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Feature flags
use snafu::{location, Location};

use crate::format::Manifest;
use lance_core::{Error, Result};

/// Fragments may contain deletion files, which record the tombstones of
/// soft-deleted rows.
pub const FLAG_DELETION_FILES: u64 = 1;
/// Row ids are table after moves, but not updates. Fragments contain an index
/// mapping row ids to row addresses.
pub const FLAG_MOVE_STABLE_ROW_IDS: u64 = 2;

/// Set the reader and writer feature flags in the manifest based on the contents of the manifest.
pub fn apply_feature_flags(manifest: &mut Manifest) -> Result<()> {
// Reset flags
manifest.reader_feature_flags = 0;
manifest.writer_feature_flags = 0;

let has_deletion_files = manifest
.fragments
.iter()
.any(|frag| frag.deletion_file.is_some());
if has_deletion_files {
// Both readers and writers need to be able to read deletion files
manifest.reader_feature_flags |= FLAG_DELETION_FILES;
manifest.writer_feature_flags |= FLAG_DELETION_FILES;
}

// If any fragment has row ids, they must all have row ids.
let has_row_ids = manifest
.fragments
.iter()
.any(|frag| frag.row_id_meta.is_some());
if has_row_ids {
if !manifest
.fragments
.iter()
.all(|frag| frag.row_id_meta.is_some())
{
return Err(Error::invalid_input(
"All fragments must have row ids",
location!(),
));
}
manifest.reader_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS;
manifest.writer_feature_flags |= FLAG_MOVE_STABLE_ROW_IDS;
}

Ok(())
}

pub fn can_read_dataset(reader_flags: u64) -> bool {
reader_flags <= 3
}

pub fn can_write_dataset(writer_flags: u64) -> bool {
writer_flags <= 3
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_read_check() {
assert!(can_read_dataset(0));
assert!(can_read_dataset(super::FLAG_DELETION_FILES));
assert!(can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS));
assert!(can_read_dataset(
super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS
));
assert!(!can_read_dataset(super::FLAG_MOVE_STABLE_ROW_IDS << 1));
}

#[test]
fn test_write_check() {
assert!(can_write_dataset(0));
assert!(can_write_dataset(super::FLAG_DELETION_FILES));
assert!(can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS));
assert!(can_write_dataset(
super::FLAG_DELETION_FILES | super::FLAG_MOVE_STABLE_ROW_IDS
));
assert!(!can_write_dataset(super::FLAG_MOVE_STABLE_ROW_IDS << 1));
}
}
54 changes: 53 additions & 1 deletion rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ impl TryFrom<pb::DeletionFile> for DeletionFile {
}
}

/// A reference to a part of a file.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ExternalFile {
pub path: String,
pub offset: u64,
pub size: u64,
}

/// Metadata about location of the row id sequence.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RowIdMeta {
Inline(Vec<u8>),
External(ExternalFile),
}

impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
type Error = Error;

fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
match value {
pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
Ok(Self::External(ExternalFile {
path: file.path.clone(),
offset: file.offset,
size: file.size,
}))
}
}
}
}

/// Data fragment.
///
/// A fragment is a set of files which represent the different columns of the same rows.
Expand All @@ -186,6 +218,10 @@ pub struct Fragment {
#[serde(skip_serializing_if = "Option::is_none")]
pub deletion_file: Option<DeletionFile>,

/// RowIndex
#[serde(skip_serializing_if = "Option::is_none")]
pub row_id_meta: Option<RowIdMeta>,

/// Original number of rows in the fragment. If this is None, then it is
/// unknown. This is only optional for legacy reasons. All new tables should
/// have this set.
Expand All @@ -198,6 +234,7 @@ impl Fragment {
id,
files: vec![],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
}
}
Expand Down Expand Up @@ -235,6 +272,7 @@ impl Fragment {
files: vec![DataFile::new_legacy(path, schema)],
deletion_file: None,
physical_rows,
row_id_meta: None,
}
}

Expand Down Expand Up @@ -280,8 +318,9 @@ impl TryFrom<pb::DataFragment> for Fragment {
.files
.into_iter()
.map(DataFile::try_from)
.collect::<Result<Vec<_>>>()?,
.collect::<Result<_>>()?,
deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
physical_rows,
})
}
Expand All @@ -301,10 +340,23 @@ impl From<&Fragment> for pb::DataFragment {
num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
}
});

let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
RowIdMeta::External(file) => {
pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
path: file.path.clone(),
offset: file.offset,
size: file.size,
})
}
});

Self {
id: f.id,
files: f.files.iter().map(pb::DataFile::from).collect(),
deletion_file,
row_id_sequence,
physical_rows: f.physical_rows.unwrap_or_default() as u64,
}
}
Expand Down
Loading

0 comments on commit 2d68062

Please sign in to comment.