Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init writer framework #135

Closed
wants to merge 4 commits into from
Closed

init writer framework #135

wants to merge 4 commits into from

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Dec 26, 2023

related issue: #34
I drafted a writer framework which has been implemented in icelake icelake-io/icelake#243 and proved that it's extensible and flexible. The following is the introduction of this API:

Target

The writer API is designed to be extensible and flexible. Each writer is decoupled and can be create and config independently. User can:

  1. Combine different writer builder to build a writer which have complex write logic. Such as FanoutPartition + DataFileWrite or FanoutPartition + PosititionDeleteFileWrite.
  2. Customize the writer and combine it with original writer builder to build a writer which
    can process the data in a specific way.

How it works

There are two kinds of writer and related builder:

  1. IcebergWriter and IcebergWriterBuilder, they are focus on the data process logical.
    If you want to support a new data process logical, you need to implement a new IcebergWriter and IcebergWriterBuilder.
  2. FileWriter and FileWriterBuilder, they are focus on the physical file write.
    If you want to support a new physical file format, you need to implement a new FileWriter and FileWriterBuilder.

The create process of iceberg writer is:

  1. Create a FileWriterBuilder.
    1a. Combine it with other FileWriterBuilder to get a new FileWriterBuilder.
  2. Use FileWriterBuilder to create a IcebergWriterBuilder.
    2a. Combine it with other IcebergWriterBuilder to get a new IcebergWriterBuilder.
  3. Use build function in IcebergWriterBuilder to create a IcebergWriter.

Simple Case 1: Create a data file writer using parquet file format.

// 1. Create a parquet file writer builder.
let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
// 2. Create a data file writer builder.
let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config);
// 3. Create a iceberg writer.
let iceberg_writer = DataFileWriterBuilder.build(schema).await?;

iceberg_writer.write(input).await?;

let write_result = iceberg_writer.flush().await?;

Complex Case 2: Create a fanout partition data file writer using parquet file format.

// 1. Create a parquet file writer builder.
let parquet_writer_builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
// 2. Create a data file writer builder.
let DataFileWriterBuilder = DataFileWriterBuilder::new(parquet_writer_builder,data_file_writer_config);
// 3. Create a fanout partition writer builder.
let fanout_partition_writer_builder = FanoutPartitionWriterBuilder::new(DataFileWriterBuilder, partition_config);
// 4. Create a iceberg writer.
let iceberg_writer = fanout_partition_writer_builder.build(schema).await?;

iceberg_writer.write(input).await?;

let write_result = iceberg_writer.flush().await?;

More case: may be the example in icelake

https://github.com/icelake-io/icelake/blob/949dda79d2ebdfa7ad07e2f88a67d01d7040c181/icelake/tests/insert_tests_v2.rs#L164

Feel free for any suggestions and I'm glad to modify them.

@liurenjie1024
Copy link
Contributor

I would suggest to have a class hierarchy to demonstrate the whole picture of this design.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 26, 2023

image

I draw a diagram to demonstrate the design of writer. Please let me know if it still has some confused parts.
One point need to note is that the iceberg writer will be separate as base file writer and functional writer logically.

For base file writer, they are the base format in iceberg, like: data file, position delete file, equality delete file. They take a file writer by which they write their content into a physical format(parquet, orc, ...).

For functional writer, they are the higher level writer to provide more complex write logic. E.g. FanoutPartitionWriter. It act like https://github.com/apache/iceberg/blob/cbb50bfa5ad8cd490e991ff4e1d7bf7c025e3d5d/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java#L28. And it can be combined with other iceberg writer.

base file writer and functional writer is only distinguished logically. Both them are IcebergWriter.

@liurenjie1024
Copy link
Contributor

I have worked together with @ZENOTME to implement such a class hierarchy in icelake, so I'm familiar with this. I feel that maybe a UML diagram can describe this better? For example, the one to many relationship, the inheritance.

@liurenjie1024
Copy link
Contributor

You can find how to write class diagram uml here: https://plantuml.com/class-diagram

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 8, 2024

Sorry for being late. I drew a UML diagram, hope it can make the design clearer. Please let me know if there is something confused.
image

@liurenjie1024
Copy link
Contributor

Sorry for being late. I drew a UML diagram, hope it can make the design clearer. Please let me know if there is something confused. image

I think this uml diagram is clear enough to demonstrate the architecture of writers. We have applied same design in icelake, and it works well. Looking forward to hear you thoughts? cc @Fokko @Xuanwo


/// The current file status of iceberg writer. It implement for the writer which write a single
/// file.
pub trait CurrentFileStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce a separate trait for this, or would it be better to add a status(&self) -> FileStatus method?

Also, implementing APIs that require &self complicates making our type Sync, as it necessitates additional Arc<Mutex> wrappers. Could we have them accept &mut self instead? We don't have any use cases for browsing files, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce a separate trait for this, or would it be better to add a status(&self) -> FileStatus method?

The reason e introduce it as a separate trait is that for some writer which may write multiple file at once, so we should not call this trait for them. E.g fanout partition writer.

Could we have them accept &mut self instead?

This interface is read-only. Make it &mut self seems weird for me.🤔 And I wonder in which case the user want to use Arc<Writer>? Seems it's a rare example for me.

}

/// The write result of iceberg writer.
pub trait IcebergWriteResult: Send + Sync + 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that the result only reveals a set_xxx API is somewhat confusing. How are users supposed to utilize these APIs?

// 4. Create a iceberg writer.
let iceberg_writer = fanout_partition_writer_builder.build(schema).await?;

iceberg_writer.write(input).await?;

let write_result = iceberg_writer.flush().await?;

How write_result will be used?

Copy link
Contributor Author

@ZENOTME ZENOTME Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, user can see the type of write_result directly and they can call write_result.build() to get the DataFile.

We also can expose a build interface to make it clear like:

trait IcebergWriteResult {
  fn build(self) -> Vec<DataFile>
}

But I'm rethinking do we really need this abstract🤔

We use generic param here because we can have DeltaResult for delta writer like

struct DeltaResult {
  data_file: Vec<DataFileBuilder>,
  pos_delete_file: Vec<DataFileBuilder>,
  eq_delete_file: Vec<DataFileBuilder>
}

But I find that it actually can read the type from DataFile directly. So return Vec<DataFileBuilder> also can work.

So I think may be we can also discard the IcebergWriteResult, and directly use write(&mut self, input: I) -> Vec<DataFileBuilder> to make interface more simple. Because at less for now, in iceberg the return result of writer is always DataFile. I'm not sure should we preserve the extendable for it.

/// The associated iceberg write result type.
type R: IcebergWriteResult;
/// Convert to iceberg write result.
fn to_iceberg_result(self) -> Self::R;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What transform will we do in this trait? Can we return IcebergWriteResult directly?

Copy link
Contributor Author

@ZENOTME ZENOTME Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I find that we can use IcebergWriteResult directly.🥵 Good point!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I also think the FileWriteResult/IcebergWriterResult is a little too complicated. What FileWriter returns is just a partial DataFile, so a DataFileBuilder would be enough.

/// The associated file write result type.
type R: FileWriteResult;
/// Write record batch to file.
async fn write(&mut self, batch: &RecordBatch) -> Result<()>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relation of RecordBatch with the I in IcebergWriter<I>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we use I in IcbergWriter<I> is that for some writers it needs some specific input format rather than RecordBatch. E.g. the PositionDeleteWriter needs a write interface like write(&mut self, file: &str, position: usize). To support this case, we make IcebergWriter<I>.

But all IcebergWriter will write the data into a file using FileWriter finally. In this time, IcebergWriter should convert the data into a RecordBatch and write them using FileWriter.

E.g. for PositionDeleteWriter, we will use write(&mut self, file: &str, position: usize), and PositionDeleteWriter will batch these data like

---
file, position,
file, position,
...
---

, and when it accumulates enough data, it can convert all data into RecordBatch and write them using FileWriter.

For me, RecordBatch is a physical data representation, and I is like a logical representation. For data file writer, the logical representation and physical representation is the same. But position delete writer, they are different.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should simplify the write result layer, and only use DataFileBuilder would be enough. Others LGTM

/// The associated iceberg write result type.
type R: IcebergWriteResult;
/// Convert to iceberg write result.
fn to_iceberg_result(self) -> Self::R;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I also think the FileWriteResult/IcebergWriterResult is a little too complicated. What FileWriter returns is just a partial DataFile, so a DataFileBuilder would be enough.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 12, 2024

To make the design more simple, I have removed the **WriteResult trait. Maybe it's time to mark it as ready for review.🤔

@ZENOTME ZENOTME marked this pull request as ready for review January 12, 2024 04:42
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM.

use arrow_schema::SchemaRef;

/// File writer builder trait.
#[async_trait::async_trait]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this now? We have upgraded to 1.75

type DefaultInput = RecordBatch;

/// The builder for iceberg writer.
#[async_trait::async_trait]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput>: Send + Clone + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

/// Write data to iceberg table.
async fn write(&mut self, input: I) -> Result<()>;
/// Flush the writer and return the write result.
async fn flush(&mut self) -> Result<Vec<DataFileBuilder>>;
Copy link
Contributor

@liurenjie1024 liurenjie1024 Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Vec<DataFileBuilder> maybe not type safe. But I also don't think we need a common trait for IcebergWriterResult. How about make IcebergWriter<I, R>, where R is its return type? The reason I propose this is that, when IcebergWriter is used, the caller usually know its concrete type, and the most useful use case of this abstraction is that we can have different layers for the writer, which should not change it's input and return type.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@liurenjie1024 liurenjie1024 requested a review from Fokko January 12, 2024 08:15
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 16, 2024

Hi, I find that we can modify

/// The current file status of iceberg writer. It implement for the writer which write a single
/// file.
pub trait CurrentFileStatus {
    /// Get the current file path.
    fn current_file_path(&self) -> String;
    /// Get the current file row number.
    fn current_row_num(&self) -> usize;
    /// Get the current file written size.
    fn current_written_size(&self) -> usize;
}

as

pub trait CurrentStatus<T> {
  fn current_status(&self) -> T
}

impl CurrentStatus<CurrentFileStatus> for DataFileWriter {
   ...
}

impl CurrentStatus<DataFileMetrics> for DataFileWriter {
  ...
}

The following traits can also help export metrics in the future.

@liurenjie1024
Copy link
Contributor

Hi, I find that we can modify

/// The current file status of iceberg writer. It implement for the writer which write a single
/// file.
pub trait CurrentFileStatus {
    /// Get the current file path.
    fn current_file_path(&self) -> String;
    /// Get the current file row number.
    fn current_row_num(&self) -> usize;
    /// Get the current file written size.
    fn current_written_size(&self) -> usize;
}

as

pub trait CurrentStatus<T> {
  fn current_status(&self) -> T
}

impl CurrentStatus<CurrentFileStatus> for DataFileWriter {
   ...
}

impl CurrentStatus<DataFileMetrics> for DataFileWriter {
  ...
}

The following traits can also help export metrics in the future.

Trait is usually used to abstract things and hide concrete implementation. I don't quite understand the goal of this trait CurrentStatus and when it will be useful?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 17, 2024

Trait is usually used to abstract things and hide concrete implementation. I don't quite understand the goal of this trait CurrentStatus and when it will be useful?

Sorry, Let me try to illustrate more clearly. Let me use a example to illustrate it:
We may have kinds of partition writer in the future: fanout partition writer will compute the partition and dispatcher them and precompute partition writer will accept the data with the partition value computed before. But both of them will expose the same metrics PartitionMetrics. Like following:

struct PartitionMetrics {
  partition_num: usize 
}

struct FanoutPartitionWriter 

impl CurrentStatus<PartitionMetrics> for FanoutPartitionWriter

struct PrecomputePartitionWriter

impl CurrentStatus<PartitionMetrics> for PrecomputePartitionWriter

And through CurrentStatus<PartitionMetrics>, we can write a metrics monitor writer and accept both of them as a input.

struct PartitionMetricsMonitorWriter<W: IcebergWriter + CurrentStatus<PartitionMetrics>> {
  writer: W
}

Do we have another way?

Yes, we also can do like this:

pub trait CurrentFileStatus {
    /// Get the current file path.
    fn current_file_path(&self) -> String;
    /// Get the current file row number.
    fn current_row_num(&self) -> usize;
    /// Get the current file written size.
    fn current_written_size(&self) -> usize;
}
 
trait CurrentPartitionMetrics {
  fn metrics() -> PartitionMetrics;
}

trait CurrentDataFileWriterMetrics {
  fn metrics() -> DataFileWriterMetrics
}

But I feel that all these traits can be abstract as pub trait CurrentStatus<T> so that instead of defining more traits, we can just define more data types in the future. The benefit of this may not be obvious. So both of way look good to me.

@liurenjie1024
Copy link
Contributor

This design is kind of too complicated for me. I still prefer the approach we used in icelake, and don't overuse too much traits for just saving code.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 17, 2024

This design is kind of too complicated for me. I still prefer the approach we used in icelake, and don't overuse too much traits for just saving code.

Actually, this design is to solve a problem I meet in icelake now.🤔 In icelake, we use an interface like the following:

impl FanoutPartitionWriter {
   fn metrics(&self) -> PartitionMetrics
}

And we have a corresponding monitor writer:

struct ParititionMetricsMonitorWriter {
  writer: FanoutPartitionWriter
}

But after I introduced a new partition writer: PrecomputePartitionWriter , I found that I couldn't reuse the ParititionMetricsMonitorWriter because it's coupled with FanoutPartitionWriter. But the metrics(state) they expose are the same, so I think maybe we need to abstract the interface that exposes the metrics(state). But I'm not sure whether it's worth solving it by introducing a new design.

@liurenjie1024
Copy link
Contributor

This design is kind of too complicated for me. I still prefer the approach we used in icelake, and don't overuse too much traits for just saving code.

Actually, this design is to solve a problem I meet in icelake now.🤔 In icelake, we use an interface like the following:

impl FanoutPartitionWriter {
   fn metrics(&self) -> PartitionMetrics
}

And we have a corresponding monitor writer:

struct ParititionMetricsMonitorWriter {
  writer: FanoutPartitionWriter
}

But after I introduced a new partition writer: PrecomputePartitionWriter , I found that I couldn't reuse the ParititionMetricsMonitorWriter because it's coupled with FanoutPartitionWriter. But the metrics(state) they expose are the same, so I think maybe we need to abstract the interface that expose the metrics(state).

These kinds of refactoring helps avoiding some code duplication, but I don't think this is good abstraction. Good abstraction should be clean and easy to understand. Even your example is not quite convincible, for example, what if the metrics of these two partition writers diverge someday in future, e.g. I want to count the latency of computing these partition calculation?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 17, 2024

This design is kind of too complicated for me. I still prefer the approach we used in icelake, and don't overuse too much traits for just saving code.

Actually, this design is to solve a problem I meet in icelake now.🤔 In icelake, we use an interface like the following:

impl FanoutPartitionWriter {
   fn metrics(&self) -> PartitionMetrics
}

And we have a corresponding monitor writer:

struct ParititionMetricsMonitorWriter {
  writer: FanoutPartitionWriter
}

But after I introduced a new partition writer: PrecomputePartitionWriter , I found that I couldn't reuse the ParititionMetricsMonitorWriter because it's coupled with FanoutPartitionWriter. But the metrics(state) they expose are the same, so I think maybe we need to abstract the interface that expose the metrics(state).

These kinds of refactoring helps avoiding some code duplication, but I don't think this is good abstraction. Good abstraction should be clean and easy to understand. Even your example is not quite convincible, for example, what if the metrics of these two partition writers diverge someday in future, e.g. I want to count the latency of computing these partition calculation?

Sounds reasonable. Let's use the approach before now. We can solve code duplication when it has a good abstraction.

@liurenjie1024
Copy link
Contributor

cc @Fokko @Xuanwo PTAL

use arrow_schema::SchemaRef;

/// File writer builder trait.
#[allow(async_fn_in_trait)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use impl Future in trait so that we can make sure the future is Send:

fn build(self, schema: &SchemaRef) -> impl Future<Output=Result<Self::R>> + Send;

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 17, 2024

I feel that for this writer framework, we may need more discussion, so I can separate this framework as IcebergWriter and FileWriter parts.
The IcebergWriter part is about how to organize our writers, it's the core and more complicated.
The FileWriter part is an abstraction for kinds of file formats like parquet, and orc. It's more simple.

And we can work on the FileWriter part first if it looks good. How do you think? @Xuanwo @Fokko

@Xuanwo
Copy link
Member

Xuanwo commented Jan 17, 2024

And we can work on the FileWriter part first if it looks good. How do you think? @Xuanwo @Fokko

LGTM! It's good to merge things in small chunks and polish them during the real usage.

@liurenjie1024
Copy link
Contributor

And we can work on the FileWriter part first if it looks good. How do you think? @Xuanwo @Fokko

+1

@liurenjie1024
Copy link
Contributor

Let's mark this pr as draft? cc @ZENOTME

@ZENOTME ZENOTME marked this pull request as draft January 23, 2024 05:31
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 23, 2024

I found a new way to make this API can be used like the following:

let writer = FileWriterHelper::new(MockFileWriterBuilder)
            // build the file writer first 
            .layer(MockFileWriterBuilder)
            .layer(MockFileWriterBuilder)
            .finish(MockIcebergWirterBuilder)
            // after finish, build the iceberg writer
            .layer(MockIcebergWirterBuilder)
            .layer(MockIcebergWirterBuilder)
            .build()
            .await
            .unwrap();

This may make this interface easier to make sense.🤔 Also feel free to leave any suggestions or confusion, I'm glad to improve it to be easier to understand and use.

@Xuanwo
Copy link
Member

Xuanwo commented Jan 23, 2024

This may make this interface easier to make sense.

The real code could be like:

let writer = FileWriterHelper::new(builder_a)
            .layer(builder_b)
            .layer(builder_c)
            .finish(builder_d)
            .layer(builder_e)
            .layer(builder_f)
            .build()
            .await
            .unwrap();

Which is harder to understand. The original design like the following is verbose but much easier to read even when we not designed the var names carefully.

let builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
let builder = DataFileWriterBuilder::new(builder,data_file_writer_config);
let writer = builder.build(schema).await?;

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 23, 2024

This may make this interface easier to make sense.

The real code could be like:

let writer = FileWriterHelper::new(builder_a)
            .layer(builder_b)
            .layer(builder_c)
            .finish(builder_d)
            .layer(builder_e)
            .layer(builder_f)
            .build()
            .await
            .unwrap();

Which is harder to understand. The original design like the following is verbose but much easier to read even when we not designed the var names carefully.

let builder = ParquetFileWriterBuilder::new(parquet_file_writer_config);
let builder = DataFileWriterBuilder::new(builder,data_file_writer_config);
let writer = builder.build(schema).await?;

Thanks for the feedback. Let's use the original design. It's just an experiment🤣.

@ZENOTME ZENOTME mentioned this pull request Feb 13, 2024
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Apr 23, 2024

close by #275

@ZENOTME ZENOTME closed this Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants