From 223cf31409b3adb31a63ad1d56fbdab8728c1a93 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sun, 25 Jun 2023 15:07:16 +0900 Subject: [PATCH] feat: support to copy from orc format (#1814) * feat: support to copy from orc format * test: add copy from orc test * chore: add license header * refactor: remove unimplemented macro * chore: apply suggestions from CR * chore: bump orc-rust to 0.2.3 --- .gitignore | 2 + Cargo.lock | 37 +++++++ src/common/datasource/Cargo.toml | 1 + src/common/datasource/src/error.rs | 10 +- src/common/datasource/src/file_format.rs | 5 + src/common/datasource/src/file_format/orc.rs | 102 +++++++++++++++++ src/common/datasource/tests/orc/README.md | 11 ++ src/common/datasource/tests/orc/test.orc | Bin 0 -> 2822 bytes src/common/datasource/tests/orc/write.py | 103 ++++++++++++++++++ src/file-table-engine/src/error.rs | 7 +- src/file-table-engine/src/table/format.rs | 2 + src/frontend/src/error.rs | 17 ++- src/frontend/src/statement/copy_table_from.rs | 27 +++++ src/frontend/src/statement/copy_table_to.rs | 1 + src/query/src/sql.rs | 1 + tests-integration/src/tests/instance_test.rs | 39 +++++++ 16 files changed, 361 insertions(+), 4 deletions(-) create mode 100644 src/common/datasource/src/file_format/orc.rs create mode 100644 src/common/datasource/tests/orc/README.md create mode 100644 src/common/datasource/tests/orc/test.orc create mode 100644 src/common/datasource/tests/orc/write.py diff --git a/.gitignore b/.gitignore index 4eb4a7e970a6..4db155f85ff3 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,5 @@ benchmarks/data # Vscode workspace *.code-workspace + +venv/ diff --git a/Cargo.lock b/Cargo.lock index d9e8c1ca5287..c75aabea7190 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,6 +1679,7 @@ dependencies = [ "derive_builder 0.12.0", "futures", "object-store", + "orc-rust", "paste", "regex", "snafu", @@ -3069,6 +3070,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.9.0" @@ -5983,6 +5990,27 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc" +[[package]] +name = "orc-rust" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e15d3f67795da54d9526e46b7808181ce6236d518f56ca1ee556d3a3fdd77c66" +dependencies = [ + "arrow", + "bytes", + "chrono", + "fallible-streaming-iterator", + "flate2", + "futures", + "futures-util", + "lazy_static", + "paste", + "prost", + "snafu", + "tokio", + "zigzag", +] + [[package]] name = "ordered-float" version = "1.1.1" @@ -11214,6 +11242,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 988bd6fb0ddf..8c1d3c793b06 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -24,6 +24,7 @@ datafusion.workspace = true derive_builder = "0.12" futures.workspace = true object-store = { path = "../../object-store" } +orc-rust = "0.2.3" regex = "1.7" snafu.workspace = true tokio.workspace = true diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index 7c7db62df5c5..f454910d8077 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -54,6 +54,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build orc reader, source: {}", source))] + OrcReader { + location: Location, + source: orc_rust::error::Error, + }, + #[snafu(display("Failed to read object from path: {}, source: {}", path, source))] ReadObject { path: String, @@ -171,7 +177,8 @@ impl ErrorExt for Error { | ReadRecordBatch { .. } | WriteRecordBatch { .. } | EncodeRecordBatch { .. } - | BufferedWriterClosed { .. } => StatusCode::Unexpected, + | BufferedWriterClosed { .. } + | OrcReader { .. } => StatusCode::Unexpected, } } @@ -182,6 +189,7 @@ impl ErrorExt for Error { fn location_opt(&self) -> Option { use Error::*; match self { + OrcReader { location, .. } => Some(*location), BuildBackend { location, .. } => Some(*location), ReadObject { location, .. } => Some(*location), ListObjects { location, .. } => Some(*location), diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 81c7443d8d22..6cd0c7861516 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -14,6 +14,7 @@ pub mod csv; pub mod json; +pub mod orc; pub mod parquet; #[cfg(test)] pub mod tests; @@ -38,6 +39,7 @@ use snafu::ResultExt; use self::csv::CsvFormat; use self::json::JsonFormat; +use self::orc::OrcFormat; use self::parquet::ParquetFormat; use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter}; use crate::compression::CompressionType; @@ -56,6 +58,7 @@ pub enum Format { Csv(CsvFormat), Json(JsonFormat), Parquet(ParquetFormat), + Orc(OrcFormat), } impl Format { @@ -64,6 +67,7 @@ impl Format { Format::Csv(_) => ".csv", Format::Json(_) => ".json", Format::Parquet(_) => ".parquet", + &Format::Orc(_) => ".orc", } } } @@ -81,6 +85,7 @@ impl TryFrom<&HashMap> for Format { "CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)), "JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)), "PARQUET" => Ok(Self::Parquet(ParquetFormat::default())), + "ORC" => Ok(Self::Orc(OrcFormat)), _ => error::UnsupportedFormatSnafu { format: &format }.fail(), } } diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs new file mode 100644 index 000000000000..fb228ee1dbca --- /dev/null +++ b/src/common/datasource/src/file_format/orc.rs @@ -0,0 +1,102 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; +use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::physical_plan::RecordBatchStream; +use futures::Stream; +use object_store::ObjectStore; +use orc_rust::arrow_reader::{create_arrow_schema, Cursor}; +use orc_rust::async_arrow_reader::ArrowStreamReader; +pub use orc_rust::error::Error as OrcError; +use orc_rust::reader::Reader; +use snafu::ResultExt; +use tokio::io::{AsyncRead, AsyncSeek}; + +use crate::error::{self, Result}; +use crate::file_format::FileFormat; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct OrcFormat; + +pub async fn new_orc_cursor( + reader: R, +) -> Result> { + let reader = Reader::new_async(reader) + .await + .context(error::OrcReaderSnafu)?; + let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?; + Ok(cursor) +} + +pub async fn new_orc_stream_reader( + reader: R, +) -> Result> { + let cursor = new_orc_cursor(reader).await?; + Ok(ArrowStreamReader::new(cursor, None)) +} + +pub async fn infer_orc_schema( + reader: R, +) -> Result { + let cursor = new_orc_cursor(reader).await?; + Ok(create_arrow_schema(&cursor)) +} + +pub struct OrcArrowStreamReaderAdapter { + stream: ArrowStreamReader, +} + +impl OrcArrowStreamReaderAdapter { + pub fn new(stream: ArrowStreamReader) -> Self { + Self { stream } + } +} + +impl RecordBatchStream + for OrcArrowStreamReaderAdapter +{ + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +impl Stream for OrcArrowStreamReaderAdapter { + type Item = DfResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx)) + .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e)))); + Poll::Ready(batch) + } +} + +#[async_trait] +impl FileFormat for OrcFormat { + async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let reader = store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; + + let schema = infer_orc_schema(reader).await?; + + Ok(schema) + } +} diff --git a/src/common/datasource/tests/orc/README.md b/src/common/datasource/tests/orc/README.md new file mode 100644 index 000000000000..a6d75884d935 --- /dev/null +++ b/src/common/datasource/tests/orc/README.md @@ -0,0 +1,11 @@ +## Generate orc data + +```bash +python3 -m venv venv +venv/bin/pip install -U pip +venv/bin/pip install -U pyorc + +./venv/bin/python write.py + +cargo test +``` diff --git a/src/common/datasource/tests/orc/test.orc b/src/common/datasource/tests/orc/test.orc new file mode 100644 index 0000000000000000000000000000000000000000..1a550407750111c42c934c3cb7d971fa90405c74 GIT binary patch literal 2822 zcmcIm&u<$=6rP#gSq*{s;Ar3Su^jd2jZyQ(nCCBB! z0ae`KRB?&KDF-eU5|ye7ao~UuH~0f0#Jwk^8~_C*WZup^FNw4H!GTfi?|t8UZ{}sk z@7XtREy<#E2qB>>_zIC%r3-|Fmj#bz{nRAn;6~LN_4XxeEVPe>x@Uwo+bdMoq-PUD zU%1yCNg0TjGO;4uhA(LxvLw+OTv6sy;F77!>GgU#Jj18JH=pHAL!Od6PFKhw7qXiW zF(tXFBn*TbdO|9hC;Q6RS58JVh)&C!*yL0>$crv2s%P>xxo9+px_zi*8t&kQ>>myV zZv^Rs;z_#n+_~PDWWinv2_wPZsq3Ggee%<l4k`Dr*~k5Z5nVA**Ee9O-4d&lg< zW%u#5z0u)!E@jzKa5}715s_V{Wrg034~7@aDM1R64}lOhuM{97)~enb_K7m)|Z z15+PSCFCXKW#nb#733A<3c14ls)jRFHGT?MF?EKhhp30B*D$}Pry-83nwBQMj(S}O zFmmDf=e{$e|F7?nbNl@z`3IAB@LyoyUl!*M`Xl50#85vu%8x7Q06%SPAD`Bijfb>l z;~|{S>}<8Z@cmR)-PoMKp=%-6-sD4m&$*TZ3U| zkZdRMFzH3pskWKio7DYO?py=75G_3#afHJWg4w@expIid5zZkx6xXYjn+EeR`8o?LV-qWoziG6 z+Z6b-2pKjSGHkR{m}w^TVWVY{`Mx8K*2-zBTX#U9-y^nu$oSV0nX8r+bkdr3> literal 0 HcmV?d00001 diff --git a/src/common/datasource/tests/orc/write.py b/src/common/datasource/tests/orc/write.py new file mode 100644 index 000000000000..28db5fa0c92e --- /dev/null +++ b/src/common/datasource/tests/orc/write.py @@ -0,0 +1,103 @@ +# Copyright 2023 Greptime Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import datetime +import pyorc + +data = { + "double_a": [1.0, 2.0, 3.0, 4.0, 5.0], + "a": [1.0, 2.0, None, 4.0, 5.0], + "b": [True, False, None, True, False], + "str_direct": ["a", "cccccc", None, "ddd", "ee"], + "d": ["a", "bb", None, "ccc", "ddd"], + "e": ["ddd", "cc", None, "bb", "a"], + "f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"], + "int_short_repeated": [5, 5, None, 5, 5], + "int_neg_short_repeated": [-5, -5, None, -5, -5], + "int_delta": [1, 2, None, 4, 5], + "int_neg_delta": [5, 4, None, 2, 1], + "int_direct": [1, 6, None, 3, 2], + "int_neg_direct": [-1, -6, None, -3, -2], + "bigint_direct": [1, 6, None, 3, 2], + "bigint_neg_direct": [-1, -6, None, -3, -2], + "bigint_other": [5, -5, 1, 5, 5], + "utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"], + "utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"], + "timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)], + "date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)] +} + +def infer_schema(data): + schema = "struct<" + for key, value in data.items(): + dt = type(value[0]) + if dt == float: + dt = "float" + elif dt == int: + dt = "int" + elif dt == bool: + dt = "boolean" + elif dt == str: + dt = "string" + elif key.startswith("timestamp"): + dt = "timestamp" + elif key.startswith("date"): + dt = "date" + else: + print(key,value,dt) + raise NotImplementedError + if key.startswith("double"): + dt = "double" + if key.startswith("bigint"): + dt = "bigint" + schema += key + ":" + dt + "," + + schema = schema[:-1] + ">" + return schema + + + +def _write( + schema: str, + data, + file_name: str, + compression=pyorc.CompressionKind.NONE, + dict_key_size_threshold=0.0, +): + output = open(file_name, "wb") + writer = pyorc.Writer( + output, + schema, + dict_key_size_threshold=dict_key_size_threshold, + # use a small number to ensure that compression crosses value boundaries + compression_block_size=32, + compression=compression, + ) + num_rows = len(list(data.values())[0]) + for x in range(num_rows): + row = tuple(values[x] for values in data.values()) + writer.write(row) + writer.close() + + with open(file_name, "rb") as f: + reader = pyorc.Reader(f) + list(reader) + + +_write( + infer_schema(data), + data, + "test.orc", +) diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs index ac00ebeb2cbf..7544f147ab0c 100644 --- a/src/file-table-engine/src/error.rs +++ b/src/file-table-engine/src/error.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_datasource::file_format::Format; use common_error::prelude::*; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError; @@ -175,6 +176,9 @@ pub enum Error { source: datatypes::error::Error, location: Location, }, + + #[snafu(display("Unsupported format: {:?}", format))] + UnsupportedFormat { format: Format, location: Location }, } pub type Result = std::result::Result; @@ -191,7 +195,8 @@ impl ErrorExt for Error { | BuildCsvConfig { .. } | ProjectSchema { .. } | MissingRequiredField { .. } - | ConvertSchema { .. } => StatusCode::InvalidArguments, + | ConvertSchema { .. } + | UnsupportedFormat { .. } => StatusCode::InvalidArguments, BuildBackend { source, .. } => source.status_code(), BuildStreamAdapter { source, .. } => source.status_code(), diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index ee3f0262ddac..99e6e4973d91 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -361,6 +361,7 @@ pub fn create_physical_plan( Format::Csv(format) => new_csv_scan_plan(ctx, config, format), Format::Json(format) => new_json_scan_plan(ctx, config, format), Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format), + _ => error::UnsupportedFormatSnafu { format: *format }.fail(), } } @@ -373,5 +374,6 @@ pub fn create_stream( Format::Csv(format) => new_csv_stream(ctx, config, format), Format::Json(format) => new_json_stream(ctx, config, format), Format::Parquet(format) => new_parquet_stream(ctx, config, format), + _ => error::UnsupportedFormatSnafu { format: *format }.fail(), } } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index bdfa8d697dcb..415bb3f3ac3c 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_datasource::file_format::Format; use common_error::prelude::*; use datafusion::parquet; use datatypes::arrow::error::ArrowError; @@ -443,6 +444,9 @@ pub enum Error { source: common_datasource::error::Error, }, + #[snafu(display("Unsupported format: {:?}", format))] + UnsupportedFormat { location: Location, format: Format }, + #[snafu(display("Failed to parse file format, source: {}", source))] ParseFileFormat { #[snafu(backtrace)] @@ -500,6 +504,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to read orc schema, source: {}", source))] + ReadOrc { + source: common_datasource::error::Error, + location: Location, + }, + #[snafu(display("Failed to build parquet record batch stream, source: {}", source))] BuildParquetRecordBatchStream { location: Location, @@ -575,7 +585,8 @@ impl ErrorExt for Error { | Error::InvalidSchema { .. } | Error::PrepareImmutableTable { .. } | Error::BuildCsvConfig { .. } - | Error::ProjectSchema { .. } => StatusCode::InvalidArguments, + | Error::ProjectSchema { .. } + | Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, @@ -667,7 +678,9 @@ impl ErrorExt for Error { Error::TableScanExec { source, .. } => source.status_code(), - Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable, + Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => { + StatusCode::StorageUnavailable + } Error::ListObjects { source } | Error::ParseUrl { source } diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 617195a5e9df..b79bc1d45d14 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -20,6 +20,9 @@ use async_compat::CompatExt; use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener}; use common_datasource::file_format::json::JsonOpener; +use common_datasource::file_format::orc::{ + infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter, +}; use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; @@ -110,6 +113,18 @@ impl StatementExecutor { .context(error::ReadParquetSnafu)?; Ok(builder.schema().clone()) } + Format::Orc(_) => { + let reader = object_store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; + + let schema = infer_orc_schema(reader) + .await + .context(error::ReadOrcSnafu)?; + + Ok(Arc::new(schema)) + } } } @@ -201,6 +216,18 @@ impl StatementExecutor { Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream))) } + Format::Orc(_) => { + let reader = object_store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; + let stream = new_orc_stream_reader(reader) + .await + .context(error::ReadOrcSnafu)?; + let stream = OrcArrowStreamReaderAdapter::new(stream); + + Ok(Box::pin(stream)) + } } } diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/frontend/src/statement/copy_table_to.rs index 77fba4a49925..73852fa5222b 100644 --- a/src/frontend/src/statement/copy_table_to.rs +++ b/src/frontend/src/statement/copy_table_to.rs @@ -68,6 +68,7 @@ impl StatementExecutor { Ok(rows_copied) } + _ => error::UnsupportedFormatSnafu { format: *format }.fail(), } } diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 73e0f800ba17..0757938952ac 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -332,6 +332,7 @@ fn parse_immutable_file_table_format( Format::Csv(format) => Box::new(format), Format::Json(format) => Box::new(format), Format::Parquet(format) => Box::new(format), + Format::Orc(format) => Box::new(format), }, ) } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index b890c3c01319..27c6df82b92f 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -1227,6 +1227,45 @@ async fn test_execute_copy_from_s3(instance: Arc) { } } +#[apply(both_instances_cases)] +async fn test_execute_copy_from_orc(instance: Arc) { + logging::init_default_ut_logging(); + let instance = instance.frontend(); + + // setups + execute_sql( + &instance, + "create table demo(double_a double, a float, b boolean, str_direct string, d string, e string, f string, int_short_repeated int, int_neg_short_repeated int, int_delta int, int_neg_delta int, int_direct int, int_neg_direct int, bigint_direct bigint, bigint_neg_direct bigint, bigint_other bigint, utf8_increase string, utf8_decrease string, timestamp_simple timestamp(9) time index, date_simple date);", + ) + .await; + + let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc") + .canonicalize() + .unwrap() + .display() + .to_string(); + + let output = execute_sql( + &instance, + &format!("copy demo from '{}' WITH(FORMAT='orc');", &filepath), + ) + .await; + + assert!(matches!(output, Output::AffectedRows(5))); + + let output = execute_sql(&instance, "select * from demo order by double_a;").await; + let expected = r#"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+ +| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | ++----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+ +| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 | +| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 | +| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 | +| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 | +| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 | ++----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#; + check_output_stream(output, expected).await; +} + #[apply(both_instances_cases)] async fn test_cast_type_issue_1594(instance: Arc) { let instance = instance.frontend();