Skip to content

Commit

Permalink
fix: get prefix from offset path
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Pack <[email protected]>
  • Loading branch information
roeap committed Feb 14, 2025
1 parent 16d2557 commit a3a7671
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 41 deletions.
8 changes: 2 additions & 6 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ impl<T> Default for ReferenceSet<T> {
#[cfg(test)]
mod tests {
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
use object_store::{memory::InMemory, path::Path};
use object_store::memory::InMemory;
use test_utils::{actions_to_string, add_commit, TestAction};

use super::*;
Expand Down Expand Up @@ -792,11 +792,7 @@ mod tests {
actions_to_string(vec![TestAction::Metadata]),
)
.await?;
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
let engine = engine_to_handle(Arc::new(engine), allocate_err);
let path = "memory:///";

Expand Down
27 changes: 14 additions & 13 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}
Expand All @@ -23,13 +22,11 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
pub(crate) fn new(
store: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
) -> Self {
Self {
inner: store,
has_ordered_listing,
table_root,
task_executor,
readahead: 10,
}
Expand All @@ -49,8 +46,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
let url = path.clone();
let offset = Path::from(path.path());
// TODO properly handle table prefix
let prefix = self.table_root.child("_delta_log");
let parts = offset.parts().collect_vec();
if parts.is_empty() {
return Err(Error::generic(format!(
"Offset path must not be a root directory. Got: '{}'",
url.as_str()
)));
}
let prefix = Path::from_iter(parts[..parts.len() - 1].iter().cloned());

let store = self.inner.clone();

Expand Down Expand Up @@ -192,11 +195,9 @@ mod tests {
let mut url = Url::from_directory_path(tmp.path()).unwrap();

let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);

Expand Down Expand Up @@ -229,11 +230,10 @@ mod tests {
store.put(&name, data.clone().into()).await.unwrap();

let table_root = Url::parse("memory:///").expect("valid url");
let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
let files: Vec<_> = engine
.get_file_system_client()
.list_from(&table_root)
.list_from(&table_root.join("_delta_log/0").unwrap())
.unwrap()
.try_collect()
.unwrap();
Expand All @@ -260,11 +260,12 @@ mod tests {

let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
let client = engine.get_file_system_client();

let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
let files = client
.list_from(&url.join("_delta_log/0").unwrap())
.unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
Expand Down
9 changes: 4 additions & 5 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use self::storage::parse_url_opts;
use object_store::{path::Path, DynObjectStore};
use object_store::DynObjectStore;
use url::Url;

use self::executor::TaskExecutor;
Expand Down Expand Up @@ -60,8 +60,8 @@ impl<E: TaskExecutor> DefaultEngine<E> {
V: Into<String>,
{
// table root is the path of the table in the ObjectStore
let (store, table_root) = parse_url_opts(table_root, options)?;
Ok(Self::new(Arc::new(store), table_root, task_executor))
let (store, _table_root) = parse_url_opts(table_root, options)?;
Ok(Self::new(Arc::new(store), task_executor))
}

/// Create a new [`DefaultEngine`] instance
Expand All @@ -71,7 +71,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `store`: The object store to use.
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
// local filesystem doesn't return a sorted list by default. Although the `object_store`
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
Expand All @@ -97,7 +97,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(
Expand Down
1 change: 0 additions & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ fn build_log_with_paths_and_checkpoint(
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

Expand Down
3 changes: 0 additions & 3 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,9 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();

let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
let cp = read_last_checkpoint(&client, &url).unwrap();
Expand Down Expand Up @@ -291,7 +289,6 @@ mod tests {
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let url = Url::parse("memory:///valid/").expect("valid url");
Expand Down
14 changes: 2 additions & 12 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));

Expand Down Expand Up @@ -113,11 +112,7 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));

let table = Table::new(location);
let expected_data = vec![batch.clone(), batch];
Expand Down Expand Up @@ -171,11 +166,7 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));

let table = Table::new(location);
let expected_data = vec![batch];
Expand Down Expand Up @@ -249,7 +240,6 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
let location = Url::parse("memory:///").unwrap();
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from(""),
Arc::new(TokioBackgroundExecutor::new()),
));

Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn setup(
let table_root_path = Path::from(format!("{base_path}{table_name}"));
let url = Url::parse(&format!("{base_url}{table_root_path}/")).unwrap();
let executor = Arc::new(TokioBackgroundExecutor::new());
let engine = DefaultEngine::new(Arc::clone(&storage), table_root_path, executor);
let engine = DefaultEngine::new(Arc::clone(&storage), executor);

(storage, engine, url)
}
Expand Down

0 comments on commit a3a7671

Please sign in to comment.