diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 500075c21..fb9682573 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -167,15 +167,17 @@ impl IcebergWriter for EqualityDeleteFileWriter { #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; use arrow_array::types::Int32Type; use arrow_array::{ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatch, StructArray}; use arrow_buffer::NullBuffer; - use arrow_schema::DataType; + use arrow_schema::{DataType, Field, Fields}; use arrow_select::concat::concat_batches; use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use tempfile::TempDir; use uuid::Uuid; @@ -657,7 +659,7 @@ mod test { #[tokio::test] async fn test_equality_delete_with_nullable_field() -> Result<(), anyhow::Error> { // prepare data - // Int, Struct(Int) + // Int, Struct(Int), Struct(Struct(Int)) let schema = Schema::builder() .with_schema_id(1) .with_fields(vec![ @@ -673,28 +675,70 @@ mod test { .into()])), ) .into(), + NestedField::optional( + 3, + "col2", + Type::Struct(StructType::new(vec![NestedField::optional( + 4, + "sub_struct_col", + Type::Struct(StructType::new(vec![NestedField::optional( + 5, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into()])), + ) + .into(), ]) .build() .unwrap(); let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - // null 1 - // 2 null(struct) - // 3 null(field) + // null 1 null(struct) + // 2 null(struct) null(sub_struct_col) + // 3 null(field) null(sub_sub_col) let col0 = Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef; - let nulls = NullBuffer::from(vec![true, false, true]); - let col1 = Arc::new(StructArray::new( - if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { - fields.clone() - } else { - unreachable!() - }, - vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], - Some(nulls), - )); - let columns = vec![col0, col1]; + let col1 = { + let nulls = NullBuffer::from(vec![true, false, true]); + Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )) + }; + let col2 = { + let inner_col = { + let nulls = NullBuffer::from(vec![true, false, true]); + Arc::new(StructArray::new( + Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))]), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], + Some(nulls), + )) + }; + let nulls = NullBuffer::from(vec![false, true, true]); + Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(2).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![inner_col], + Some(nulls), + )) + }; + let columns = vec![col0, col1, col2]; let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); - let equality_ids = vec![0_i32, 2]; + let equality_ids = vec![0_i32, 2, 5]; let equality_config = EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); let projector = equality_config.projector.clone(); @@ -705,6 +749,7 @@ mod test { RecordBatch::try_new(equality_config.projected_arrow_schema_ref().clone(), vec![ Arc::new(Int32Array::from(vec![None, Some(2), Some(3)])) as ArrayRef, Arc::new(Int32Array::from(vec![Some(1), None, None])) as ArrayRef, + Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef, ]) .unwrap(); assert_eq!(to_write_projected, expect_batch);