Skip to content

Commit

Permalink
Address Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Aug 16, 2024
1 parent ffff1ce commit 398e481
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 35 deletions.
14 changes: 7 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,16 @@ pub enum StorageError {
NotFound(ObjectId),
#[error("synchronization error on the Storage instance")]
Deadlock,
#[error("Error contacting object store {0}")]
#[error("error contacting object store {0}")]
ObjectStore(#[from] object_store::Error),
#[error("Error reading or writing to/from parquet files: {0}")]
#[error("error reading or writing to/from parquet files: {0}")]
ParquetError(#[from] parquet_errors::ParquetError),
#[error("Error reading RecordBatch from parquet files.")]
BadRecordBatchRead,
#[error("Bad byte range for chunk read `{0:?}`.")]
BadByteRange(Option<Range<ChunkOffset>>),
#[error("I/O error: `{0:?}`")]
#[error("error reading RecordBatch from parquet file {0}.")]
BadRecordBatchRead(String),
#[error("i/o error: `{0:?}`")]
IOError(#[from] io::Error),
#[error("bad path: {0}")]
BadPath(Path),
}

/// Fetch and write the parquet files that represent the dataset in object store
Expand Down
51 changes: 23 additions & 28 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@ use std::{
};

use crate::{
AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Storage, StorageError,
AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Path, Storage, StorageError,
StructureTable,
};
use arrow::array::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, path::Path, ObjectStore};
use object_store::{
local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, ObjectStore,
};
use parquet::arrow::async_writer::AsyncArrowWriter;
use parquet::arrow::{
async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};

const STRUCTURE_PREFIX: &str = "s/";
const MANIFEST_PREFIX: &str = "m/";
// const ATTRIBUTES_PREFIX: &str = "a/";
const CHUNK_PREFIX: &str = "c/";

// #[derive(Default)]
pub struct ObjectStorage {
store: Arc<dyn ObjectStore>,
prefix: String,
Expand All @@ -31,16 +36,12 @@ impl ObjectStorage {
pub fn new_in_memory_store() -> ObjectStorage {
ObjectStorage { store: Arc::new(InMemory::new()), prefix: "".into() }
}
pub fn new_local_store(
prefix: std::path::PathBuf,
) -> Result<ObjectStorage, StorageError> {
pub fn new_local_store(prefix: &Path) -> Result<ObjectStorage, StorageError> {
create_dir_all(prefix.as_path())?;
let prefix = prefix.to_str().ok_or(StorageError::BadPath(prefix.to_owned()))?;
Ok(ObjectStorage {
store: Arc::new(LocalFileSystem::new_with_prefix(&prefix)?),
// We rely on `new_with_prefix` to create the `prefix` directory
// if it doesn't exist. It will also add the prefix to any path
// so we set ObjectStorate::prefix to an empty string.
prefix: "".to_string(),
store: Arc::new(LocalFileSystem::new()),
prefix: prefix.to_owned().to_string(),
})
}
pub fn new_s3_store_from_env(
Expand Down Expand Up @@ -69,41 +70,32 @@ impl ObjectStorage {
Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() })
}

fn get_path(&self, file_prefix: &str, ObjectId(asu8): &ObjectId) -> Path {
fn get_path(&self, file_prefix: &str, ObjectId(asu8): &ObjectId) -> ObjectPath {
// TODO: be careful about allocation here
let path = format!(
"{}/{}/{}.parquet",
self.prefix,
file_prefix,
BASE64_URL_SAFE.encode(asu8)
);
Path::from(path)
ObjectPath::from(path)
}

async fn read_parquet(&self, path: &Path) -> Result<RecordBatch, StorageError> {
use parquet::arrow::{
async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};

// TODO: avoid this metadata read since we are always reading the whole thing.
async fn read_parquet(&self, path: &ObjectPath) -> Result<RecordBatch, StorageError> {
// FIXME: avoid this metadata read since we are always reading the whole thing.
let meta = self.store.head(path).await?;
let reader = ParquetObjectReader::new(Arc::clone(&self.store), meta);
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?.build()?;
// TODO: do we always have only one batch ever? Assert that
let maybe_batch = builder.next().await;
if let Some(batch) = maybe_batch {
Ok(batch?)
} else {
Err(StorageError::BadRecordBatchRead)
}
Ok(maybe_batch.ok_or(StorageError::BadRecordBatchRead(path.to_string()))??)
}

async fn write_parquet(
&self,
path: &Path,
path: &ObjectPath,
batch: &RecordBatch,
) -> Result<(), StorageError> {
use parquet::arrow::async_writer::AsyncArrowWriter;
let mut buffer = Vec::new();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, batch.schema(), None)?;
writer.write(batch).await?;
Expand Down Expand Up @@ -335,6 +327,7 @@ impl Storage for InMemoryStorage {

#[cfg(test)]
mod tests {
use std::env::temp_dir;
use std::sync::Arc;

use crate::ObjectId;
Expand All @@ -358,15 +351,17 @@ mod tests {
async fn test_read_write_parquet_object_storage() {
// simple test to make sure we can speak to all stores
let batch = make_record_batch();
let prefix: String = rand::thread_rng()
let mut prefix = temp_dir().to_str().unwrap().to_string();
let rdms: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
prefix.push_str(rdms.as_str());

for store in [
ObjectStorage::new_in_memory_store(),
ObjectStorage::new_local_store(prefix.clone().into()).unwrap(),
ObjectStorage::new_local_store(&prefix.into()).unwrap(),
// ObjectStorage::new_s3_store_from_env("testbucket".to_string()).unwrap(),
// ObjectStorage::new_s3_store_with_config("testbucket".to_string(), prefix)
// .unwrap(),
Expand Down

0 comments on commit 398e481

Please sign in to comment.