Skip to content

Commit

Permalink
perf: optimize IO path for reading manifest (#2396)
Browse files Browse the repository at this point in the history
Fixes #2338
Partially addresses #2318

**For a dataset on local SSD with 8,000 versions, we get 6x faster load
time and 3x faster append.**

* Added special code path for local filesystem for finding latest
manifest. This path skips the `metadata` call for paths that aren't
relevant, both fixing #2338 and improving performance on local
filesystems overall.
* Fixed code path where we were reading the manifest file twice
* Changed `CloudObjectReader` and `LocalFileReader` to both cache the
file size, so we aren't making multiple calls to get the size of the
same object/file. Also allowed passing the size when opening, in case we
already have it from a list operation.
* Deprecated some more methods for loading a dataset, in favor of using
`DatasetBuilder`. Also consolidated the implementations to use
`DatasetBuilder`, so we have fewer code paths to worry about and test.

## TODO

* [x] Cleanup
* [x] Add IO unit test for loading a dataset
* [x] Check repro from 2318

---------

Co-authored-by: Weston Pace <[email protected]>
  • Loading branch information
wjones127 and westonpace authored May 29, 2024
1 parent 0e62451 commit 9a0fa1f
Show file tree
Hide file tree
Showing 17 changed files with 408 additions and 192 deletions.
5 changes: 1 addition & 4 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ A `Lance Dataset` is organized in a directory.
/path/to/dataset:
data/*.lance -- Data directory
latest.manifest -- The manifest file for the latest version.
_versions/*.manifest -- Manifest file for each dataset version.
_indices/{UUID-*}/index.idx -- Secondary index, each index per directory.
_deletions/*.{arrow,bin} -- Deletion files, which contain ids of rows
Expand Down Expand Up @@ -249,8 +248,7 @@ Committing Datasets
-------------------

A new version of a dataset is committed by writing a new manifest file to the
``_versions`` directory. Only after successfully committing this file should
the ``_latest.manifest`` file be updated.
``_versions`` directory.

To prevent concurrent writers from overwriting each other, the commit process
must be atomic and consistent for all writers. If two writers try to commit
Expand Down Expand Up @@ -287,7 +285,6 @@ The commit process is as follows:
conflicts, abort the commit. Otherwise, continue.
4. Build a manifest and attempt to commit it to the next version. If the commit
fails because another writer has already committed, go back to step 3.
5. If the commit succeeds, update the ``_latest.manifest`` file.

When checking whether two transactions conflict, be conservative. If the
transaction file is missing, assume it conflicts. If the transaction file
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-file/src/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod tests {
.unwrap();
writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let actual = PageTable::load(
Expand Down Expand Up @@ -285,7 +285,7 @@ mod tests {
let mut writer = tokio::fs::File::create(&path).await.unwrap();
let res = page_table.write(&mut writer, 0).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();

Expand Down
14 changes: 7 additions & 7 deletions rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
/// use lance_io::{local::LocalObjectReader, encodings::binary::BinaryDecoder, traits::Reader};
///
/// async {
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048).await.unwrap();
/// let reader = LocalObjectReader::open_local_path("/tmp/foo.lance", 2048, None).await.unwrap();
/// let string_decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), 100, 1024, true);
/// };
/// ```
Expand Down Expand Up @@ -494,7 +494,7 @@ mod tests {

let pos = write_test_data(&path, arrs).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let read_len = arrs.iter().map(|a| a.len()).sum();
Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -605,7 +605,7 @@ mod tests {
let path = temp_dir.path().join("foo");

let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand All @@ -627,7 +627,7 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -658,7 +658,7 @@ mod tests {
let path = temp_dir.path().join("foo");
let pos = write_test_data(&path, &[&data]).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<Utf8Type>::new(reader.as_ref(), pos, data.len(), false);
Expand Down Expand Up @@ -738,7 +738,7 @@ mod tests {
pos
};

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
let decoder = BinaryDecoder::<BinaryType>::new(reader.as_ref(), pos, data.len(), true);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
object_writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
let decoder = DictionaryDecoder::new(
Expand Down
6 changes: 3 additions & 3 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 1024)
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
writer.flush().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down Expand Up @@ -753,7 +753,7 @@ mod tests {
writer.shutdown().await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048)
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
.await
.unwrap();
assert!(reader.size().await.unwrap() > 0);
Expand Down
47 changes: 34 additions & 13 deletions rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lance_core::{Error, Result};
use object_store::path::Path;
use snafu::{location, Location};
use tokio::io::AsyncSeekExt;
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::{Reader, Writer};
Expand Down Expand Up @@ -56,6 +57,10 @@ pub struct LocalObjectReader {
/// Fie path.
path: Path,

/// Known size of the file. This is either passed in on construction or
/// cached on the first metadata call.
size: OnceCell<usize>,

/// Block size, in bytes.
block_size: usize,
}
Expand All @@ -71,23 +76,20 @@ impl LocalObjectReader {
pub async fn open_local_path(
path: impl AsRef<std::path::Path>,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.as_ref().to_owned();
let object_store_path = Path::from_filesystem_path(&path)?;
tokio::task::spawn_blocking(move || {
let local_file = File::open(&path)?;
Ok(Box::new(Self {
file: Arc::new(local_file),
path: object_store_path,
block_size,
}) as Box<dyn Reader>)
})
.await?
Self::open(&object_store_path, block_size, known_size).await
}

/// Open a local object reader, with default prefetch size.
#[instrument(level = "debug")]
pub async fn open(path: &Path, block_size: usize) -> Result<Box<dyn Reader>> {
pub async fn open(
path: &Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Box<dyn Reader>> {
let path = path.clone();
let local_path = to_local_path(&path);
tokio::task::spawn_blocking(move || {
Expand All @@ -98,9 +100,11 @@ impl LocalObjectReader {
},
_ => e.into(),
})?;
let size = OnceCell::new_with(known_size);
Ok(Box::new(Self {
file: Arc::new(file),
block_size,
size,
path: path.clone(),
}) as Box<dyn Reader>)
})
Expand All @@ -119,13 +123,26 @@ impl Reader for LocalObjectReader {
}

/// Returns the file size.
async fn size(&self) -> Result<usize> {
Ok(self.file.metadata()?.len() as usize)
async fn size(&self) -> object_store::Result<usize> {
let file = self.file.clone();
self.size
.get_or_try_init(|| async move {
let metadata = tokio::task::spawn_blocking(move || {
file.metadata().map_err(|err| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
})
.await??;
Ok(metadata.len() as usize)
})
.await
.cloned()
}

/// Reads a range of data.
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
let file = self.file.clone();
tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(range.len());
Expand All @@ -140,6 +157,10 @@ impl Reader for LocalObjectReader {
Ok(buf.freeze())
})
.await?
.map_err(|err: std::io::Error| object_store::Error::Generic {
store: "LocalFileSystem",
source: err.into(),
})
}
}

Expand Down
32 changes: 23 additions & 9 deletions rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;

use crate::traits::Reader;
Expand All @@ -23,6 +24,8 @@ pub struct CloudObjectReader {
pub object_store: Arc<dyn ObjectStore>,
// File path
pub path: Path,
// File size, if known.
size: OnceCell<usize>,

block_size: usize,
}
Expand All @@ -36,10 +39,16 @@ impl DeepSizeOf for CloudObjectReader {

impl CloudObjectReader {
/// Create an ObjectReader from URI
pub fn new(object_store: Arc<dyn ObjectStore>, path: Path, block_size: usize) -> Result<Self> {
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
})
}
Expand All @@ -50,14 +59,14 @@ impl CloudObjectReader {
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> Result<O> {
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err.into());
return Err(err);
}
retries -= 1;
}
Expand All @@ -77,15 +86,20 @@ impl Reader for CloudObjectReader {
}

/// Object/File Size.
async fn size(&self) -> Result<usize> {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
async fn size(&self) -> object_store::Result<usize> {
self.size
.get_or_try_init(|| async move {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
})
.await
.cloned()
}

#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
Expand Down
20 changes: 19 additions & 1 deletion rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,29 @@ impl ObjectStore {
/// - ``path``: Absolute path to the file.
pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size).await,
"file" => LocalObjectReader::open(path, self.block_size, None).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
None,
)?)),
}
}

/// Open a reader for a file with known size.
///
/// This size may either have been retrieved from a list operation or
/// cached metadata. By passing in the known size, we can skip a HEAD / metadata
/// call.
pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
Some(known_size),
)?)),
}
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl IoTask {
let bytes = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize)
.await;
.await
.map_err(Error::from);
(self.when_done)(bytes);
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
fn block_size(&self) -> usize;

/// Object/File Size.
async fn size(&self) -> Result<usize>;
async fn size(&self) -> object_store::Result<usize>;

/// Read a range of bytes from the object.
///
/// TODO: change to read_at()?
async fn get_range(&self, range: Range<usize>) -> Result<Bytes>;
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes>;
}
Loading

0 comments on commit 9a0fa1f

Please sign in to comment.