From dc1ae54f549c18502af151d82a11c41089d8a61c Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 16 Aug 2024 15:54:51 -0600 Subject: [PATCH] Address Comments --- src/lib.rs | 14 ++++++------- src/storage.rs | 53 ++++++++++++++++++++++++-------------------------- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e2a804d6..777b5ecc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -567,16 +567,16 @@ pub enum StorageError { NotFound(ObjectId), #[error("synchronization error on the Storage instance")] Deadlock, - #[error("Error contacting object store {0}")] + #[error("error contacting object store {0}")] ObjectStore(#[from] object_store::Error), - #[error("Error reading or writing to/from parquet files: {0}")] + #[error("error reading or writing to/from parquet files: {0}")] ParquetError(#[from] parquet_errors::ParquetError), - #[error("Error reading RecordBatch from parquet files.")] - BadRecordBatchRead, - #[error("Bad byte range for chunk read `{0:?}`.")] - BadByteRange(Option>), - #[error("I/O error: `{0:?}`")] + #[error("error reading RecordBatch from parquet file {0}.")] + BadRecordBatchRead(String), + #[error("i/o error: `{0:?}`")] IOError(#[from] io::Error), + #[error("bad path: {0}")] + BadPath(Path), } /// Fetch and write the parquet files that represent the dataset in object store diff --git a/src/storage.rs b/src/storage.rs index c0dadb59..e23e1ba9 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -7,21 +7,26 @@ use std::{ }; use crate::{ - AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Storage, StorageError, + AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Path, Storage, StorageError, StructureTable, }; use arrow::array::RecordBatch; use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; -use object_store::{local::LocalFileSystem, memory::InMemory, path::Path, ObjectStore}; +use object_store::{ + local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, ObjectStore, +}; +use parquet::arrow::async_writer::AsyncArrowWriter; +use parquet::arrow::{ + async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder, +}; const STRUCTURE_PREFIX: &str = "s/"; const MANIFEST_PREFIX: &str = "m/"; // const ATTRIBUTES_PREFIX: &str = "a/"; const CHUNK_PREFIX: &str = "c/"; -// #[derive(Default)] pub struct ObjectStorage { store: Arc, prefix: String, @@ -31,16 +36,14 @@ impl ObjectStorage { pub fn new_in_memory_store() -> ObjectStorage { ObjectStorage { store: Arc::new(InMemory::new()), prefix: "".into() } } - pub fn new_local_store( - prefix: std::path::PathBuf, - ) -> Result { + pub fn new_local_store(prefix: &Path) -> Result { create_dir_all(prefix.as_path())?; + let prefix = prefix + .to_str() + .ok_or(StorageError::BadPath(prefix.to_owned()))?; Ok(ObjectStorage { - store: Arc::new(LocalFileSystem::new_with_prefix(&prefix)?), - // We rely on `new_with_prefix` to create the `prefix` directory - // if it doesn't exist. It will also add the prefix to any path - // so we set ObjectStorate::prefix to an empty string. - prefix: "".to_string(), + store: Arc::new(LocalFileSystem::new()), + prefix: prefix.to_owned().to_string(), }) } pub fn new_s3_store_from_env( @@ -69,7 +72,7 @@ impl ObjectStorage { Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() }) } - fn get_path(&self, file_prefix: &str, ObjectId(asu8): &ObjectId) -> Path { + fn get_path(&self, file_prefix: &str, ObjectId(asu8): &ObjectId) -> ObjectPath { // TODO: be careful about allocation here let path = format!( "{}/{}/{}.parquet", @@ -77,33 +80,24 @@ impl ObjectStorage { file_prefix, BASE64_URL_SAFE.encode(asu8) ); - Path::from(path) + ObjectPath::from(path) } - async fn read_parquet(&self, path: &Path) -> Result { - use parquet::arrow::{ - async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder, - }; - - // TODO: avoid this metadata read since we are always reading the whole thing. + async fn read_parquet(&self, path: &ObjectPath) -> Result { + // FIXME: avoid this metadata read since we are always reading the whole thing. let meta = self.store.head(path).await?; let reader = ParquetObjectReader::new(Arc::clone(&self.store), meta); let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?.build()?; // TODO: do we always have only one batch ever? Assert that let maybe_batch = builder.next().await; - if let Some(batch) = maybe_batch { - Ok(batch?) - } else { - Err(StorageError::BadRecordBatchRead) - } + Ok(maybe_batch.ok_or(StorageError::BadRecordBatchRead(path.to_string()))??) } async fn write_parquet( &self, - path: &Path, + path: &ObjectPath, batch: &RecordBatch, ) -> Result<(), StorageError> { - use parquet::arrow::async_writer::AsyncArrowWriter; let mut buffer = Vec::new(); let mut writer = AsyncArrowWriter::try_new(&mut buffer, batch.schema(), None)?; writer.write(batch).await?; @@ -335,6 +329,7 @@ impl Storage for InMemoryStorage { #[cfg(test)] mod tests { + use std::env::temp_dir; use std::sync::Arc; use crate::ObjectId; @@ -358,15 +353,17 @@ mod tests { async fn test_read_write_parquet_object_storage() { // simple test to make sure we can speak to all stores let batch = make_record_batch(); - let prefix: String = rand::thread_rng() + let mut prefix = temp_dir().to_str().unwrap().to_string(); + let rdms: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); + prefix.push_str(rdms.as_str()); for store in [ ObjectStorage::new_in_memory_store(), - ObjectStorage::new_local_store(prefix.clone().into()).unwrap(), + ObjectStorage::new_local_store(&prefix.into()).unwrap(), // ObjectStorage::new_s3_store_from_env("testbucket".to_string()).unwrap(), // ObjectStorage::new_s3_store_with_config("testbucket".to_string(), prefix) // .unwrap(),