Skip to content

Commit

Permalink
feat: separate metadata location from serialization
Browse files Browse the repository at this point in the history
This commit changes a couple function signatures that effectively allow the user to differentiate where metadata bytes get written to
and what's contained in the metadata type. For more info on why see: <apache#778>
  • Loading branch information
Sl1mb0 committed Dec 11, 2024
1 parent bd2c20b commit 3675f6f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 97 deletions.
52 changes: 28 additions & 24 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,40 @@ mod tests {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
let manifest_file_output = self.next_manifest_file();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
manifest_file_output.location().to_string(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build()],
))
.write(
manifest_file_output,
Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build()],
),
)
.await
.unwrap();

Expand Down
122 changes: 63 additions & 59 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,72 +980,76 @@ mod tests {
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
let manifest_file_output = self.next_manifest_file();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
manifest_file_output.location().to_string(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.write(
manifest_file_output,
Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
],
))
vec![
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
],
),
)
.await
.unwrap();

Expand Down
41 changes: 27 additions & 14 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Manifest {

/// A manifest writer.
pub struct ManifestWriter {
output: OutputFile,
manifest_path: String,

snapshot_id: i64,

Expand All @@ -133,9 +133,9 @@ pub struct ManifestWriter {

impl ManifestWriter {
/// Create a new manifest writer.
pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec<u8>) -> Self {
pub fn new(manifest_path: String, snapshot_id: i64, key_metadata: Vec<u8>) -> Self {
Self {
output,
manifest_path,
snapshot_id,
added_files: 0,
added_rows: 0,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ManifestWriter {
}

/// Write a manifest.
pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
pub async fn write(mut self, dest: OutputFile, manifest: Manifest) -> Result<ManifestFile> {
// Create the avro writer
let partition_type = manifest
.metadata
Expand Down Expand Up @@ -299,13 +299,13 @@ impl ManifestWriter {

let content = avro_writer.into_inner()?;
let length = content.len();
self.output.write(Bytes::from(content)).await?;
dest.write(Bytes::from(content)).await?;

let partition_summary =
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());

Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_path: self.manifest_path,
manifest_length: length as i64,
partition_spec_id: manifest.metadata.partition_spec.spec_id(),
content: manifest.metadata.content,
Expand Down Expand Up @@ -1621,7 +1621,9 @@ mod tests {
]
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

test_manifest_read_write(manifest, writer).await;
}
Expand Down Expand Up @@ -1787,7 +1789,9 @@ mod tests {
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let res = test_manifest_read_write(manifest, writer).await;

Expand Down Expand Up @@ -1854,8 +1858,13 @@ mod tests {
})],
};

let writer =
|output_file: OutputFile| ManifestWriter::new(output_file, 2966623707104393227, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(
output_file.location().to_string(),
2966623707104393227,
vec![],
)
};

test_manifest_read_write(manifest, writer).await;
}
Expand Down Expand Up @@ -1940,7 +1949,9 @@ mod tests {
]
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let entry = test_manifest_read_write(manifest, writer).await;

Expand Down Expand Up @@ -2014,7 +2025,9 @@ mod tests {
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);
let writer = |output_file: OutputFile| {
ManifestWriter::new(output_file.location().to_string(), 1, vec![])
};

let (avro_bytes, _) = write_manifest(&manifest, writer).await;

Expand Down Expand Up @@ -2107,8 +2120,8 @@ mod tests {
let path = temp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let writer = writer_builder(output_file);
let res = writer.write(manifest.clone()).await.unwrap();
let writer = writer_builder(output_file.clone());
let res = writer.write(output_file, manifest.clone()).await.unwrap();

// Verify manifest
(fs::read(path).expect("read_file must succeed"), res)
Expand Down

0 comments on commit 3675f6f

Please sign in to comment.