Skip to content

Commit

Permalink
refactor: make proto conversion fallible and not copy (#2371)
Browse files Browse the repository at this point in the history
Minor refactor to allow conversions *from* proto structs to be fallible
and also consume the struct. This reduces some data copies, though it's
probably not all that impactful.
  • Loading branch information
wjones127 authored May 22, 2024
1 parent 0ad037f commit 959ea9c
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 71 deletions.
9 changes: 5 additions & 4 deletions rust/lance-file/src/format/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ impl From<&Metadata> for pb::Metadata {
}
}

impl From<pb::Metadata> for Metadata {
fn from(m: pb::Metadata) -> Self {
Self {
impl TryFrom<pb::Metadata> for Metadata {
type Error = Error;
fn try_from(m: pb::Metadata) -> Result<Self> {
Ok(Self {
batch_offsets: m.batch_offsets.clone(),
page_table_position: m.page_table_position as usize,
manifest_position: Some(m.manifest_position as usize),
Expand All @@ -70,7 +71,7 @@ impl From<pb::Metadata> for Metadata {
} else {
None
},
}
})
}
}

Expand Down
20 changes: 12 additions & 8 deletions rust/lance-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ pub async fn read_message<M: Message + Default>(reader: &dyn Reader, pos: usize)
pub async fn read_struct<
'm,
M: Message + Default + 'static,
T: ProtoStruct<Proto = M> + From<M>,
T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
>(
reader: &dyn Reader,
pos: usize,
) -> Result<T> {
let msg = read_message::<M>(reader, pos).await?;
let obj = T::from(msg);
Ok(obj)
T::try_from(msg)
}

pub async fn read_last_block(reader: &dyn Reader) -> Result<Bytes> {
Expand Down Expand Up @@ -165,11 +164,14 @@ pub fn read_message_from_buf<M: Message + Default>(buf: &Bytes) -> Result<M> {
}

/// Read a Protobuf-backed struct from a buffer.
pub fn read_struct_from_buf<M: Message + Default, T: ProtoStruct<Proto = M> + From<M>>(
pub fn read_struct_from_buf<
M: Message + Default,
T: ProtoStruct<Proto = M> + TryFrom<M, Error = Error>,
>(
buf: &Bytes,
) -> Result<T> {
let msg: M = read_message_from_buf(buf)?;
Ok(T::from(msg))
T::try_from(msg)
}

#[cfg(test)]
Expand All @@ -184,6 +186,7 @@ mod tests {
object_writer::ObjectWriter,
traits::{ProtoStruct, WriteExt, Writer},
utils::read_struct,
Error, Result,
};

// Bytes is a prost::Message, since we don't have any .proto files in this crate we
Expand All @@ -201,9 +204,10 @@ mod tests {
}
}

impl From<Bytes> for BytesWrapper {
fn from(value: Bytes) -> Self {
Self(value)
impl TryFrom<Bytes> for BytesWrapper {
type Error = Error;
fn try_from(value: Bytes) -> Result<Self> {
Ok(Self(value))
}
}

Expand Down
61 changes: 37 additions & 24 deletions rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ impl From<&DataFile> for pb::DataFile {
}
}

impl From<&pb::DataFile> for DataFile {
fn from(proto: &pb::DataFile) -> Self {
Self::new(
&proto.path,
proto.fields.clone(),
proto.column_indices.clone(),
proto.file_major_version,
proto.file_minor_version,
)
impl TryFrom<pb::DataFile> for DataFile {
type Error = Error;

fn try_from(proto: pb::DataFile) -> Result<Self> {
Ok(Self {
path: proto.path,
fields: proto.fields,
column_indices: proto.column_indices,
file_major_version: proto.file_major_version,
file_minor_version: proto.file_minor_version,
})
}
}

Expand Down Expand Up @@ -140,26 +142,31 @@ pub struct DeletionFile {
pub num_deleted_rows: Option<usize>,
}

// TODO: should we convert this to TryFrom and surface the error?
#[allow(clippy::fallible_impl_from)]
impl From<&pb::DeletionFile> for DeletionFile {
fn from(value: &pb::DeletionFile) -> Self {
impl TryFrom<pb::DeletionFile> for DeletionFile {
type Error = Error;

fn try_from(value: pb::DeletionFile) -> Result<Self> {
let file_type = match value.file_type {
0 => DeletionFileType::Array,
1 => DeletionFileType::Bitmap,
_ => panic!("Invalid deletion file type"),
_ => {
return Err(Error::NotSupported {
source: "Unknown deletion file type".into(),
location: location!(),
})
}
};
let num_deleted_rows = if value.num_deleted_rows == 0 {
None
} else {
Some(value.num_deleted_rows as usize)
};
Self {
Ok(Self {
read_version: value.read_version,
id: value.id,
file_type,
num_deleted_rows,
}
})
}
}

Expand Down Expand Up @@ -258,19 +265,25 @@ impl Fragment {
}
}

impl From<&pb::DataFragment> for Fragment {
fn from(p: &pb::DataFragment) -> Self {
impl TryFrom<pb::DataFragment> for Fragment {
type Error = Error;

fn try_from(p: pb::DataFragment) -> Result<Self> {
let physical_rows = if p.physical_rows > 0 {
Some(p.physical_rows as usize)
} else {
None
};
Self {
Ok(Self {
id: p.id,
files: p.files.iter().map(DataFile::from).collect(),
deletion_file: p.deletion_file.as_ref().map(DeletionFile::from),
files: p
.files
.into_iter()
.map(DataFile::try_from)
.collect::<Result<Vec<_>>>()?,
deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
physical_rows,
}
})
}
}

Expand Down Expand Up @@ -346,12 +359,12 @@ mod tests {
});

let proto = pb::DataFragment::from(&fragment);
let fragment2 = Fragment::from(&proto);
let fragment2 = Fragment::try_from(proto.clone()).unwrap();
assert_eq!(fragment, fragment2);

fragment.deletion_file = None;
let proto = pb::DataFragment::from(&fragment);
let fragment2 = Fragment::from(&proto);
let fragment2 = Fragment::try_from(proto).unwrap();
assert_eq!(fragment, fragment2);
}

Expand Down
8 changes: 4 additions & 4 deletions rust/lance-table/src/format/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub struct Index {
pub fragment_bitmap: Option<RoaringBitmap>,
}

impl TryFrom<&pb::IndexMetadata> for Index {
impl TryFrom<pb::IndexMetadata> for Index {
type Error = Error;

fn try_from(proto: &pb::IndexMetadata) -> Result<Self> {
fn try_from(proto: pb::IndexMetadata) -> Result<Self> {
let fragment_bitmap = if proto.fragment_bitmap.is_empty() {
None
} else {
Expand All @@ -50,8 +50,8 @@ impl TryFrom<&pb::IndexMetadata> for Index {
location!(),
)
})??,
name: proto.name.clone(),
fields: proto.fields.clone(),
name: proto.name,
fields: proto.fields,
dataset_version: proto.dataset_version,
fragment_bitmap,
})
Expand Down
16 changes: 11 additions & 5 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ impl ProtoStruct for Manifest {
type Proto = pb::Manifest;
}

impl From<pb::Manifest> for Manifest {
fn from(p: pb::Manifest) -> Self {
impl TryFrom<pb::Manifest> for Manifest {
type Error = Error;
fn try_from(p: pb::Manifest) -> Result<Self> {
let timestamp_nanos = p.timestamp.map(|ts| {
let sec = ts.seconds as u128 * 1e9 as u128;
let nanos = ts.nanos as u128;
Expand All @@ -342,13 +343,18 @@ impl From<pb::Manifest> for Manifest {
}
_ => None,
};
let fragments = Arc::new(p.fragments.iter().map(Fragment::from).collect::<Vec<_>>());
let fragments = Arc::new(
p.fragments
.into_iter()
.map(Fragment::try_from)
.collect::<Result<Vec<_>>>()?,
);
let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
let fields_with_meta = FieldsWithMeta {
fields: Fields(p.fields),
metadata: p.metadata,
};
Self {
Ok(Self {
schema: Schema::from(fields_with_meta),
version: p.version,
writer_version,
Expand All @@ -366,7 +372,7 @@ impl From<pb::Manifest> for Manifest {
Some(p.transaction_file)
},
fragment_offsets,
}
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Ma
}

let proto = pb::Manifest::decode(buf)?;
Ok(Manifest::from(proto))
Manifest::try_from(proto)
}

#[instrument(level = "debug", skip(object_store, manifest))]
Expand All @@ -105,7 +105,7 @@ pub async fn read_manifest_indexes(

Ok(section
.indices
.iter()
.into_iter()
.map(Index::try_from)
.collect::<Result<Vec<_>>>()?)
} else {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ impl Dataset {
};
let data = self.object_store.inner.get(&path).await?.bytes().await?;
let transaction = lance_table::format::pb::Transaction::decode(data)?;
Transaction::try_from(&transaction).map(Some)
Transaction::try_from(transaction).map(Some)
}

/// Restore the currently checked out version of the dataset as the latest version.
Expand Down
Loading

0 comments on commit 959ea9c

Please sign in to comment.