From 3c29319d666dee2411bc39628c8806b8b489e40b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 15 Aug 2024 16:01:27 -0600 Subject: [PATCH] Fix a little more --- src/dataset.rs | 2 +- src/lib.rs | 12 ++++++------ src/storage.rs | 50 ++++++++++++++++++++++++++++---------------------- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/dataset.rs b/src/dataset.rs index 60014581..1a567801 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -586,7 +586,7 @@ impl TableRegionTracker { } } -#[derive(Debug, Clone, PartialEq, Eq, Error)] +#[derive(Debug, Clone, Error)] pub enum FlushError { #[error("no changes made to the dataset")] NoChangesToFlush, diff --git a/src/lib.rs b/src/lib.rs index e059a5c5..98a44040 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -546,20 +546,20 @@ pub enum UpdateNodeError { NotAnArray(Path), } -#[derive(Clone, Debug, PartialEq, Eq, Error)] +#[derive(Clone, Debug, Error)] pub enum StorageError { #[error("object not found `{0:?}`")] NotFound(ObjectId), #[error("synchronization error on the Storage instance")] Deadlock, - #[error("")] - ParseError(Path), #[error("Error contacting object store `{0:?}`")] - ObjectStoreError(String), + ObjectStoreError(Arc), #[error("Storage layer error: {0}")] - StorageLayerError(String), + StorageLayerError(Arc), #[error("Error reading or writing to/from parquet files: `{0:?}`")] - ParquetError(String), + ParquetError(Arc), + #[error("Storage layer error: {0}")] + MiscError(String), } /// Fetch and write the parquet files that represent the dataset in object store diff --git a/src/storage.rs b/src/storage.rs index 763c9b74..9cf9f932 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -5,18 +5,14 @@ use std::{ sync::{Arc, RwLock}, }; -use arrow::array::RecordBatch; -use async_trait::async_trait; -use bytes::Bytes; -use futures::StreamExt; -use parquet::arrow::{ - async_reader::ParquetObjectReader, AsyncArrowWriter, ParquetRecordBatchStreamBuilder, -}; - use crate::{ AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Storage, StorageError, StorageError::StorageLayerError, StructureTable, }; +use arrow::array::RecordBatch; +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; use object_store::{local::LocalFileSystem, memory::InMemory, path::Path, ObjectStore}; #[allow(dead_code)] @@ -51,12 +47,12 @@ impl ObjectStorage { Ok(ObjectStorage { store: Arc::new( LocalFileSystem::new_with_prefix(&prefix) - .map_err(|err| StorageLayerError(err.to_string()))?, + .map_err(|err| StorageLayerError(Arc::new(err)))?, ), prefix: prefix .to_str() - .ok_or("Couldn't convert prefix to string") - .map_err(|err| StorageError::StorageLayerError(err.to_owned()))? + //TODO: + .ok_or(StorageError::MiscError("Couldn't convert prefix to string".to_string()))? .to_owned(), }) } @@ -68,7 +64,7 @@ impl ObjectStorage { let store = AmazonS3Builder::from_env() .with_bucket_name(bucket_name.into()) .build() - .map_err(|err| StorageError::ObjectStoreError(err.to_string()))?; + .map_err(|err| StorageError::ObjectStoreError(Arc::new(err)))?; Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() }) } @@ -85,7 +81,7 @@ impl ObjectStorage { .with_allow_http(true) .with_bucket_name(bucket_name.into()) .build() - .map_err(|err| StorageError::ObjectStoreError(err.to_string()))?; + .map_err(|err| StorageError::ObjectStoreError(Arc::new(err)))?; Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() }) } @@ -98,22 +94,31 @@ impl ObjectStorage { } async fn read_parquet(&self, path: &Path) -> Result { + use crate::StorageError::ParquetError; + use parquet::arrow::{ + async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder, + }; + // TODO: avoid this read since we are always reading the whole thing. let meta = self .store .head(path) .await - .map_err(|err| StorageError::ParquetError(err.to_string()))?; + .map_err(|err| StorageError::ParquetError(Arc::new(err)))?; let reader = ParquetObjectReader::new(Arc::clone(&self.store), meta); let mut builder = ParquetRecordBatchStreamBuilder::new(reader) .await - .map_err(|err| StorageError::ParquetError(err.to_string()))? + .map_err(|err| StorageError::ParquetError(Arc::new(err)))? .build() - .map_err(|err| StorageError::ParquetError(err.to_string()))?; + .map_err(|err| StorageError::ParquetError(Arc::new(err)))?; // TODO: do we always have only one batch ever? Assert that - // TODO: Use `if let`; - Ok(builder.next().await.unwrap().unwrap()) + let maybe_batch = builder.next().await; + if let Some(batch) = maybe_batch { + batch.map_err(|err| ParquetError(Arc::new(err))) + } else { + Err(StorageError::MiscError("ParquetError:No more record batches".to_string())) + } } async fn write_parquet( @@ -122,18 +127,19 @@ impl ObjectStorage { batch: &RecordBatch, ) -> Result<(), StorageError> { use crate::StorageError::ParquetError; + use parquet::arrow::async_writer::AsyncArrowWriter; let mut buffer = Vec::new(); let mut writer = AsyncArrowWriter::try_new(&mut buffer, batch.schema(), None) - .map_err(|err| ParquetError(err.to_string()))?; - writer.write(batch).await.map_err(|err| ParquetError(err.to_string()))?; - writer.close().await.map_err(|err| ParquetError(err.to_string()))?; + .map_err(|err| ParquetError(Arc::new(err)))?; + writer.write(batch).await.map_err(|err| ParquetError(Arc::new(err)))?; + writer.close().await.map_err(|err| ParquetError(Arc::new(err)))?; // TODO: find object_store streaming interface let payload = object_store::PutPayload::from(buffer); self.store .put(path, payload) .await - .map_err(|err| StorageLayerError(err.to_string()))?; + .map_err(|err| StorageLayerError(Arc::new(err)))?; Ok(()) } }