diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 731072a5a..a41e99511 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -253,36 +253,40 @@ mod tests { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = self.table.metadata().default_partition_spec(); + let manifest_file_output = self.next_manifest_file(); // Write data files let data_file_manifest = ManifestWriter::new( - self.next_manifest_file(), + manifest_file_output.location().to_string(), current_snapshot.snapshot_id(), vec![], ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema((*current_schema).clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build() - .unwrap(), - ) - .build()], - )) + .write( + manifest_file_output, + Manifest::new( + ManifestMetadata::builder() + .schema((*current_schema).clone()) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .partition_spec((**current_partition_spec).clone()) + .schema_id(current_schema.schema_id()) + .build(), + vec![ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build()], + ), + ) .await .unwrap(); diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index bc7f10a0e..ebe934d19 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -980,72 +980,76 @@ mod tests { .unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); let current_partition_spec = self.table.metadata().default_partition_spec(); + let manifest_file_output = self.next_manifest_file(); // Write data files let data_file_manifest = ManifestWriter::new( - self.next_manifest_file(), + manifest_file_output.location().to_string(), current_snapshot.snapshot_id(), vec![], ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema((*current_schema).clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ - ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build() - .unwrap(), - ) + .write( + manifest_file_output, + Manifest::new( + ManifestMetadata::builder() + .schema((*current_schema).clone()) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .partition_spec((**current_partition_spec).clone()) + .schema_id(current_schema.schema_id()) .build(), - ManifestEntry::builder() - .status(ManifestStatus::Deleted) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/2.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(200))])) - .build() - .unwrap(), - ) - .build(), - ManifestEntry::builder() - .status(ManifestStatus::Existing) - .snapshot_id(parent_snapshot.snapshot_id()) - .sequence_number(parent_snapshot.sequence_number()) - .file_sequence_number(parent_snapshot.sequence_number()) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/3.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(300))])) - .build() - .unwrap(), - ) - .build(), - ], - )) + vec![ + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/2.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(200))])) + .build() + .unwrap(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/3.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(), + ) + .build(), + ], + ), + ) .await .unwrap(); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index f0dfdf47c..83ca9459c 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -113,7 +113,7 @@ impl Manifest { /// A manifest writer. pub struct ManifestWriter { - output: OutputFile, + manifest_path: String, snapshot_id: i64, @@ -133,9 +133,9 @@ pub struct ManifestWriter { impl ManifestWriter { /// Create a new manifest writer. - pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec) -> Self { + pub fn new(manifest_path: String, snapshot_id: i64, key_metadata: Vec) -> Self { Self { - output, + manifest_path, snapshot_id, added_files: 0, added_rows: 0, @@ -204,7 +204,7 @@ impl ManifestWriter { } /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result { + pub async fn write(mut self, dest: OutputFile, manifest: Manifest) -> Result { // Create the avro writer let partition_type = manifest .metadata @@ -299,13 +299,13 @@ impl ManifestWriter { let content = avro_writer.into_inner()?; let length = content.len(); - self.output.write(Bytes::from(content)).await?; + dest.write(Bytes::from(content)).await?; let partition_summary = self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); Ok(ManifestFile { - manifest_path: self.output.location().to_string(), + manifest_path: self.manifest_path, manifest_length: length as i64, partition_spec_id: manifest.metadata.partition_spec.spec_id(), content: manifest.metadata.content, @@ -1621,7 +1621,9 @@ mod tests { ] }; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + let writer = |output_file: OutputFile| { + ManifestWriter::new(output_file.location().to_string(), 1, vec![]) + }; test_manifest_read_write(manifest, writer).await; } @@ -1787,7 +1789,9 @@ mod tests { })], }; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + let writer = |output_file: OutputFile| { + ManifestWriter::new(output_file.location().to_string(), 1, vec![]) + }; let res = test_manifest_read_write(manifest, writer).await; @@ -1854,8 +1858,13 @@ mod tests { })], }; - let writer = - |output_file: OutputFile| ManifestWriter::new(output_file, 2966623707104393227, vec![]); + let writer = |output_file: OutputFile| { + ManifestWriter::new( + output_file.location().to_string(), + 2966623707104393227, + vec![], + ) + }; test_manifest_read_write(manifest, writer).await; } @@ -1940,7 +1949,9 @@ mod tests { ] }; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + let writer = |output_file: OutputFile| { + ManifestWriter::new(output_file.location().to_string(), 1, vec![]) + }; let entry = test_manifest_read_write(manifest, writer).await; @@ -2014,7 +2025,9 @@ mod tests { })], }; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + let writer = |output_file: OutputFile| { + ManifestWriter::new(output_file.location().to_string(), 1, vec![]) + }; let (avro_bytes, _) = write_manifest(&manifest, writer).await; @@ -2107,8 +2120,8 @@ mod tests { let path = temp_dir.path().join("test_manifest.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = writer_builder(output_file); - let res = writer.write(manifest.clone()).await.unwrap(); + let writer = writer_builder(output_file.clone()); + let res = writer.write(output_file, manifest.clone()).await.unwrap(); // Verify manifest (fs::read(path).expect("read_file must succeed"), res)