Skip to content

Commit

Permalink
feat: support to copy from orc format (GreptimeTeam#1814)
Browse files Browse the repository at this point in the history
* feat: support to copy from orc format

* test: add copy from orc test

* chore: add license header

* refactor: remove unimplemented macro

* chore: apply suggestions from CR

* chore: bump orc-rust to 0.2.3
  • Loading branch information
WenyXu authored Jun 25, 2023
1 parent 62f660e commit 223cf31
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ benchmarks/data

# Vscode workspace
*.code-workspace

venv/
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ datafusion.workspace = true
derive_builder = "0.12"
futures.workspace = true
object-store = { path = "../../object-store" }
orc-rust = "0.2.3"
regex = "1.7"
snafu.workspace = true
tokio.workspace = true
Expand Down
10 changes: 9 additions & 1 deletion src/common/datasource/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build orc reader, source: {}", source))]
OrcReader {
location: Location,
source: orc_rust::error::Error,
},

#[snafu(display("Failed to read object from path: {}, source: {}", path, source))]
ReadObject {
path: String,
Expand Down Expand Up @@ -171,7 +177,8 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BufferedWriterClosed { .. } => StatusCode::Unexpected,
| BufferedWriterClosed { .. }
| OrcReader { .. } => StatusCode::Unexpected,
}
}

Expand All @@ -182,6 +189,7 @@ impl ErrorExt for Error {
fn location_opt(&self) -> Option<common_error::snafu::Location> {
use Error::*;
match self {
OrcReader { location, .. } => Some(*location),
BuildBackend { location, .. } => Some(*location),
ReadObject { location, .. } => Some(*location),
ListObjects { location, .. } => Some(*location),
Expand Down
5 changes: 5 additions & 0 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod csv;
pub mod json;
pub mod orc;
pub mod parquet;
#[cfg(test)]
pub mod tests;
Expand All @@ -38,6 +39,7 @@ use snafu::ResultExt;

use self::csv::CsvFormat;
use self::json::JsonFormat;
use self::orc::OrcFormat;
use self::parquet::ParquetFormat;
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
Expand All @@ -56,6 +58,7 @@ pub enum Format {
Csv(CsvFormat),
Json(JsonFormat),
Parquet(ParquetFormat),
Orc(OrcFormat),
}

impl Format {
Expand All @@ -64,6 +67,7 @@ impl Format {
Format::Csv(_) => ".csv",
Format::Json(_) => ".json",
Format::Parquet(_) => ".parquet",
&Format::Orc(_) => ".orc",
}
}
}
Expand All @@ -81,6 +85,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
"CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
"JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
"PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
"ORC" => Ok(Self::Orc(OrcFormat)),
_ => error::UnsupportedFormatSnafu { format: &format }.fail(),
}
}
Expand Down
102 changes: 102 additions & 0 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::task::{Context, Poll};

use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::physical_plan::RecordBatchStream;
use futures::Stream;
use object_store::ObjectStore;
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
use orc_rust::async_arrow_reader::ArrowStreamReader;
pub use orc_rust::error::Error as OrcError;
use orc_rust::reader::Reader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};

use crate::error::{self, Result};
use crate::file_format::FileFormat;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

pub async fn new_orc_cursor<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Cursor<R>> {
let reader = Reader::new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?;
Ok(cursor)
}

pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let cursor = new_orc_cursor(reader).await?;
Ok(ArrowStreamReader::new(cursor, None))
}

pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let cursor = new_orc_cursor(reader).await?;
Ok(create_arrow_schema(&cursor))
}

pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
stream: ArrowStreamReader<T>,
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
pub fn new(stream: ArrowStreamReader<T>) -> Self {
Self { stream }
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> RecordBatchStream
for OrcArrowStreamReaderAdapter<T>
{
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStreamReaderAdapter<T> {
type Item = DfResult<DfRecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
Poll::Ready(batch)
}
}

#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;

let schema = infer_orc_schema(reader).await?;

Ok(schema)
}
}
11 changes: 11 additions & 0 deletions src/common/datasource/tests/orc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Generate orc data

```bash
python3 -m venv venv
venv/bin/pip install -U pip
venv/bin/pip install -U pyorc

./venv/bin/python write.py

cargo test
```
Binary file added src/common/datasource/tests/orc/test.orc
Binary file not shown.
103 changes: 103 additions & 0 deletions src/common/datasource/tests/orc/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2023 Greptime Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import datetime
import pyorc

data = {
"double_a": [1.0, 2.0, 3.0, 4.0, 5.0],
"a": [1.0, 2.0, None, 4.0, 5.0],
"b": [True, False, None, True, False],
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
"d": ["a", "bb", None, "ccc", "ddd"],
"e": ["ddd", "cc", None, "bb", "a"],
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
"int_short_repeated": [5, 5, None, 5, 5],
"int_neg_short_repeated": [-5, -5, None, -5, -5],
"int_delta": [1, 2, None, 4, 5],
"int_neg_delta": [5, 4, None, 2, 1],
"int_direct": [1, 6, None, 3, 2],
"int_neg_direct": [-1, -6, None, -3, -2],
"bigint_direct": [1, 6, None, 3, 2],
"bigint_neg_direct": [-1, -6, None, -3, -2],
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
}

def infer_schema(data):
schema = "struct<"
for key, value in data.items():
dt = type(value[0])
if dt == float:
dt = "float"
elif dt == int:
dt = "int"
elif dt == bool:
dt = "boolean"
elif dt == str:
dt = "string"
elif key.startswith("timestamp"):
dt = "timestamp"
elif key.startswith("date"):
dt = "date"
else:
print(key,value,dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
if key.startswith("bigint"):
dt = "bigint"
schema += key + ":" + dt + ","

schema = schema[:-1] + ">"
return schema



def _write(
schema: str,
data,
file_name: str,
compression=pyorc.CompressionKind.NONE,
dict_key_size_threshold=0.0,
):
output = open(file_name, "wb")
writer = pyorc.Writer(
output,
schema,
dict_key_size_threshold=dict_key_size_threshold,
# use a small number to ensure that compression crosses value boundaries
compression_block_size=32,
compression=compression,
)
num_rows = len(list(data.values())[0])
for x in range(num_rows):
row = tuple(values[x] for values in data.values())
writer.write(row)
writer.close()

with open(file_name, "rb") as f:
reader = pyorc.Reader(f)
list(reader)


_write(
infer_schema(data),
data,
"test.orc",
)
7 changes: 6 additions & 1 deletion src/file-table-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;

use common_datasource::file_format::Format;
use common_error::prelude::*;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -175,6 +176,9 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat { format: Format, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -191,7 +195,8 @@ impl ErrorExt for Error {
| BuildCsvConfig { .. }
| ProjectSchema { .. }
| MissingRequiredField { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
| ConvertSchema { .. }
| UnsupportedFormat { .. } => StatusCode::InvalidArguments,

BuildBackend { source, .. } => source.status_code(),
BuildStreamAdapter { source, .. } => source.status_code(),
Expand Down
Loading

0 comments on commit 223cf31

Please sign in to comment.