Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: init iceberg writer #275

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ array-init = "2"
arrow-arith = { version = "51" }
arrow-array = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
async-stream = "0.3.5"
async-trait = "0.1"
bimap = "0.6"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
Expand Down
318 changes: 318 additions & 0 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module provide `DataFileWriter`.

use crate::spec::{DataContentType, DataFile, Struct};
use crate::writer::file_writer::FileWriter;
use crate::writer::CurrentFileStatus;
use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder};
use crate::Result;
use arrow_array::RecordBatch;
use itertools::Itertools;

/// Builder for `DataFileWriter`.
#[derive(Clone)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
}

impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
}
}

/// Config for `DataFileWriter`.
pub struct DataFileWriterConfig {
partition_value: Struct,
}

impl DataFileWriterConfig {
/// Create a new `DataFileWriterConfig` with partition value.
pub fn new(partition_value: Option<Struct>) -> Self {
Self {
partition_value: partition_value.unwrap_or(Struct::empty()),
}
}
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
type R = DataFileWriter<B>;
type C = DataFileWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
partition_value: config.partition_value,
})
}
}

/// A writer write data is within one spec/partition.
pub struct DataFileWriter<B: FileWriterBuilder> {
inner_writer: Option<B::R>,
partition_value: Struct,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
self.inner_writer.as_mut().unwrap().write(&batch).await
}

async fn close(&mut self) -> Result<Vec<DataFile>> {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::Data);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect_vec())
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
fn current_file_path(&self) -> String {
self.inner_writer.as_ref().unwrap().current_file_path()
}

fn current_row_num(&self) -> usize {
self.inner_writer.as_ref().unwrap().current_row_num()
}

fn current_written_size(&self) -> usize {
self.inner_writer.as_ref().unwrap().current_written_size()
}
}

#[cfg(test)]
mod test {
use std::{collections::HashMap, sync::Arc};

use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
use tempfile::TempDir;

use crate::{
io::FileIOBuilder,
spec::DataFileFormat,
writer::{
base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig},
file_writer::{
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
ParquetWriterBuilder,
},
tests::check_parquet_data_file,
IcebergWriter, IcebergWriterBuilder,
},
};

#[tokio::test]
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// prepare data
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
let schema = {
let fields = vec![
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
arrow_schema::Field::new(
"col1",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
arrow_schema::Field::new(
"col3",
arrow_schema::DataType::List(Arc::new(
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
)),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new(
"col4",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let col1 = Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
));
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
"test";
1024
])) as ArrayRef;
let col3 = Arc::new({
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(
vec![Some(1),]
);
1024
])
.into_parts();
arrow_array::ListArray::new(
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)]))),
list_parts.1,
list_parts.2,
list_parts.3,
)
}) as ArrayRef;
let col4 = Arc::new(StructArray::new(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
vec![Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
))],
None,
));
let to_write =
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();

// prepare writer
let pb = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
to_write.schema(),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut data_file_writer = DataFileWriterBuilder::new(pb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little bit odd to pass a builder to another builder. Could we do something like this?

DataFileWriter
    .builder()
    // this would call ParquetWriter::builder() to get a `ParquetWriterBuilder`,
    // and then pass ownership of the `DataFileWriter` to the `ParquetWriterBuilder`,
    // returning the `ParquetWriterBuilder`
    .with_writer(ParquetWriter)  
    // these calls happen in the `ParquetWriterBuilder`, 
    // allowing customization of the wrapped concrete writer
    .with_foo()
    .with_bar()
    // this finalizes the `ParquetWriterBuilder`, building a
    // `ParquetWriter`, and returns the `DataFileWriterBuilder`
    // that was passed earlier, after first passing in the `ParrquetWriter`
    .build_writer()
    // these calls now happen on the `DataFileWriterBuilder`,
    // allowing further setup of the `DataFileWriter`
    .with_baz()
    .with_quux()
    // finally returns a `DataFileWriter`, or perhaps a
    // `Result<DataFileWriter>` or Future of one of those
    .build() 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This top-down way looks good to me to avoid passing builders to each other. However, I am not sure whether this style will incur more complexity in the future.

Actually, this API for now can do this thing because it doesn't restrict how to create and use the builder like the following. So I think this is more relative to the implementation style of our builder rather than the API design.

#[derive(Clone)]
struct B;

impl B {
    pub fn new() -> Self {
        Self
    }

    pub fn with_config(&mut self, _: ()) -> &mut Self {
        self
    }
}

#[async_trait::async_trait]
impl IcebergWriterBuilder for B {
    type R = BW;
    type C = ();
    async fn build(self, _: Self::C) -> Result<Self::R> {
        Ok(BW)
    }
}

struct BW;

impl BW {
    pub fn builder() -> B {
        B::new()
    }
}

#[async_trait::async_trait]
impl IcebergWriter for BW {
    async fn write(&mut self, _input: DefaultInput) -> Result<()> {
        Ok(())
    }

    async fn flush(&mut self) -> Result<DefaultOutput> {
        Ok(vec![])
    }
}

#[derive(Clone)]
struct A<I> {
    inner: Option<I>,
}

impl<I: IcebergWriterBuilder> A<I> {
    pub fn new() -> Self {
        Self { inner: None }
    }

    pub fn with_buidler(&mut self, builder: I) -> &mut I {
        self.inner = Some(builder);
        self.inner.as_mut().unwrap()
    }

    pub fn with_config(&mut self, _: ()) -> &mut Self {
        self
    }
}

struct AW;

impl AW {
    pub fn builder<I:IcebergWriterBuilder>() -> A<I> {
        A::<I>::new()
    }
}

#[async_trait::async_trait]
impl<I: IcebergWriterBuilder> IcebergWriterBuilder for A<I> {
    type R = AW;
    type C = ();
    async fn build(self, _: Self::C) -> Result<Self::R> {
        Ok(AW)
    }
}

#[async_trait::async_trait]
impl IcebergWriter for AW {
    async fn write(&mut self, _input: DefaultInput) -> Result<()> {
        Ok(())
    }

    async fn flush(&mut self) -> Result<DefaultOutput> {
        Ok(vec![])
    }
}

async fn test() {
    let mut a = AW::builder();
    a.
        // config first A
        with_config(()).
        with_buidler(AW::builder()).
        // config second A
        with_config(()).
        with_buidler(BW::builder()).
        // config BW
        with_config(());
    let writer = a.build(()).await;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both way looks good to me. cc @Fokko @Xuanwo @liurenjie1024

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered that we have discussed about this before...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this #135 (comment). Indeed they look similar.🥵

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move on first if both way looks good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

.build(DataFileWriterConfig::new(None))
.await?;

// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;

Ok(())
}
}
20 changes: 20 additions & 0 deletions crates/iceberg/src/writer/base_writer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.

pub mod data_file_writer;
6 changes: 4 additions & 2 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! This module contains the writer for data file format supported by iceberg: parquet, orc.

use super::{CurrentFileStatus, DefaultOutput};
use crate::Result;
use super::CurrentFileStatus;
use crate::{spec::DataFileBuilder, Result};
use arrow_array::RecordBatch;
use futures::Future;

Expand All @@ -28,6 +28,8 @@ mod track_writer;

pub mod location_generator;

type DefaultOutput = Vec<DataFileBuilder>;

/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
/// The associated file writer type.
Expand Down
Loading
Loading