Skip to content

Commit

Permalink
Fix a little more
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Aug 15, 2024
1 parent a973727 commit 3c29319
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ impl TableRegionTracker {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Error)]
#[derive(Debug, Clone, Error)]
pub enum FlushError {
#[error("no changes made to the dataset")]
NoChangesToFlush,
Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,20 +546,20 @@ pub enum UpdateNodeError {
NotAnArray(Path),
}

#[derive(Clone, Debug, PartialEq, Eq, Error)]
#[derive(Clone, Debug, Error)]
pub enum StorageError {
#[error("object not found `{0:?}`")]
NotFound(ObjectId),
#[error("synchronization error on the Storage instance")]
Deadlock,
#[error("")]
ParseError(Path),
#[error("Error contacting object store `{0:?}`")]
ObjectStoreError(String),
ObjectStoreError(Arc<dyn std::error::Error>),
#[error("Storage layer error: {0}")]
StorageLayerError(String),
StorageLayerError(Arc<dyn std::error::Error>),
#[error("Error reading or writing to/from parquet files: `{0:?}`")]
ParquetError(String),
ParquetError(Arc<dyn std::error::Error>),
#[error("Storage layer error: {0}")]
MiscError(String),
}

/// Fetch and write the parquet files that represent the dataset in object store
Expand Down
50 changes: 28 additions & 22 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ use std::{
sync::{Arc, RwLock},
};

use arrow::array::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use parquet::arrow::{
async_reader::ParquetObjectReader, AsyncArrowWriter, ParquetRecordBatchStreamBuilder,
};

use crate::{
AttributesTable, ChunkOffset, ManifestsTable, ObjectId, Storage, StorageError,
StorageError::StorageLayerError, StructureTable,
};
use arrow::array::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, path::Path, ObjectStore};

#[allow(dead_code)]
Expand Down Expand Up @@ -51,12 +47,12 @@ impl ObjectStorage {
Ok(ObjectStorage {
store: Arc::new(
LocalFileSystem::new_with_prefix(&prefix)
.map_err(|err| StorageLayerError(err.to_string()))?,
.map_err(|err| StorageLayerError(Arc::new(err)))?,
),
prefix: prefix
.to_str()
.ok_or("Couldn't convert prefix to string")
.map_err(|err| StorageError::StorageLayerError(err.to_owned()))?
//TODO:
.ok_or(StorageError::MiscError("Couldn't convert prefix to string".to_string()))?
.to_owned(),
})
}
Expand All @@ -68,7 +64,7 @@ impl ObjectStorage {
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket_name.into())
.build()
.map_err(|err| StorageError::ObjectStoreError(err.to_string()))?;
.map_err(|err| StorageError::ObjectStoreError(Arc::new(err)))?;
Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() })
}

Expand All @@ -85,7 +81,7 @@ impl ObjectStorage {
.with_allow_http(true)
.with_bucket_name(bucket_name.into())
.build()
.map_err(|err| StorageError::ObjectStoreError(err.to_string()))?;
.map_err(|err| StorageError::ObjectStoreError(Arc::new(err)))?;
Ok(ObjectStorage { store: Arc::new(store), prefix: prefix.into() })
}

Expand All @@ -98,22 +94,31 @@ impl ObjectStorage {
}

async fn read_parquet(&self, path: &Path) -> Result<RecordBatch, StorageError> {
use crate::StorageError::ParquetError;
use parquet::arrow::{
async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};

// TODO: avoid this read since we are always reading the whole thing.
let meta = self
.store
.head(path)
.await
.map_err(|err| StorageError::ParquetError(err.to_string()))?;
.map_err(|err| StorageError::ParquetError(Arc::new(err)))?;
let reader = ParquetObjectReader::new(Arc::clone(&self.store), meta);
let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
.await
.map_err(|err| StorageError::ParquetError(err.to_string()))?
.map_err(|err| StorageError::ParquetError(Arc::new(err)))?
.build()
.map_err(|err| StorageError::ParquetError(err.to_string()))?;
.map_err(|err| StorageError::ParquetError(Arc::new(err)))?;

// TODO: do we always have only one batch ever? Assert that
// TODO: Use `if let`;
Ok(builder.next().await.unwrap().unwrap())
let maybe_batch = builder.next().await;
if let Some(batch) = maybe_batch {
batch.map_err(|err| ParquetError(Arc::new(err)))
} else {
Err(StorageError::MiscError("ParquetError:No more record batches".to_string()))
}
}

async fn write_parquet(
Expand All @@ -122,18 +127,19 @@ impl ObjectStorage {
batch: &RecordBatch,
) -> Result<(), StorageError> {
use crate::StorageError::ParquetError;
use parquet::arrow::async_writer::AsyncArrowWriter;
let mut buffer = Vec::new();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, batch.schema(), None)
.map_err(|err| ParquetError(err.to_string()))?;
writer.write(batch).await.map_err(|err| ParquetError(err.to_string()))?;
writer.close().await.map_err(|err| ParquetError(err.to_string()))?;
.map_err(|err| ParquetError(Arc::new(err)))?;
writer.write(batch).await.map_err(|err| ParquetError(Arc::new(err)))?;
writer.close().await.map_err(|err| ParquetError(Arc::new(err)))?;

// TODO: find object_store streaming interface
let payload = object_store::PutPayload::from(buffer);
self.store
.put(path, payload)
.await
.map_err(|err| StorageLayerError(err.to_string()))?;
.map_err(|err| StorageLayerError(Arc::new(err)))?;
Ok(())
}
}
Expand Down

0 comments on commit 3c29319

Please sign in to comment.