diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index b14b1295e..6145abb2d 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -79,6 +79,22 @@ impl Manifest { Ok(Manifest { metadata, entries }) } + + /// Upgrade the format version of this manifest. + pub fn upgrade_version(&mut self, format_version: FormatVersion) -> Result<(), Error> { + match (self.metadata.format_version, format_version) { + (FormatVersion::V1, FormatVersion::V1) => Ok(()), + (FormatVersion::V2, FormatVersion::V2) => Ok(()), + (FormatVersion::V1, FormatVersion::V2) => { + self.metadata.format_version = format_version; + Ok(()) + } + (FormatVersion::V2, FormatVersion::V1) => Err(Error::new( + ErrorKind::DataInvalid, + "Cannot downgrade manifest format version from V2 to V1", + )), + } + } } /// A manifest writer. @@ -1104,8 +1120,8 @@ mod _serde { Ok(ManifestEntry { status: self.status.try_into()?, snapshot_id: Some(self.snapshot_id), - sequence_number: None, - file_sequence_number: None, + sequence_number: Some(0), + file_sequence_number: Some(0), data_file: self.data_file.try_into(partition_type, schema)?, }) } @@ -1620,8 +1636,8 @@ mod tests { entries: vec![ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), - sequence_number: None, - file_sequence_number: None, + sequence_number: Some(0), + file_sequence_number: Some(0), data_file: DataFile { content: DataContentType::Data, file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), @@ -1690,8 +1706,100 @@ mod tests { ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), - sequence_number: None, - file_sequence_number: None, + sequence_number: Some(0), + file_sequence_number: Some(0), + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![( + 1000, + Some( + Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) + .unwrap() + ), + "category".to_string() + )] + .into_iter() + ), + record_count: 1, + file_size_in_bytes: 874, + column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]), + value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]), + upper_bounds: HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + }, + } + ] + }; + + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + let entry = test_manifest_read_write(manifest, writer).await; + + assert_eq!(entry.partitions.len(), 1); + assert_eq!(entry.partitions[0].lower_bound, Some(Literal::string("x"))); + assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x"))); + } + + #[tokio::test] + async fn test_write_manifest_v1_partition_as_v2() { + let mut manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema: Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "category", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + partition_spec: PartitionSpec { + spec_id: 0, + fields: vec![PartitionField { + name: "category".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1000, + }], + }, + content: ManifestContentType::Data, + format_version: FormatVersion::V1, + }, + entries: vec![ + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: Some(0), + sequence_number: Some(0), + file_sequence_number: Some(0), data_file: DataFile { content: DataContentType::Data, file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(), @@ -1731,6 +1839,7 @@ mod tests { } ] }; + manifest.upgrade_version(FormatVersion::V2).unwrap(); let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 9948662ea..a0d3b4eeb 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -1374,4 +1374,56 @@ mod test { temp_dir.close().unwrap(); } + + #[tokio::test] + async fn test_manifest_list_writer_v1_as_v2() { + let expected_manifest_list = ManifestList { + entries: vec![ManifestListEntry { + manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: 1646658105718557341, + added_data_files_count: Some(3), + existing_data_files_count: Some(0), + deleted_data_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], + key_metadata: vec![], + }] + }; + + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v1.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + + let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1); + writer + .add_manifest_entries(expected_manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let bs = fs::read(path).unwrap(); + let manifest_list = ManifestList::parse_with_version( + &bs, + crate::spec::FormatVersion::V2, + &HashMap::from([( + 1, + StructType::new(vec![Arc::new(NestedField::required( + 1, + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + )]), + ) + .unwrap(); + assert_eq!(manifest_list, expected_manifest_list); + + temp_dir.close().unwrap(); + } }