Skip to content

Commit

Permalink
- fix init-default
Browse files Browse the repository at this point in the history
- add test case for upgrade v1 to v2
  • Loading branch information
ZENOTME committed Dec 18, 2023
1 parent c87a652 commit 11dbd92
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 6 deletions.
121 changes: 115 additions & 6 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ impl Manifest {

Ok(Manifest { metadata, entries })
}

/// Upgrade the format version of this manifest.
pub fn upgrade_version(&mut self, format_version: FormatVersion) -> Result<(), Error> {
match (self.metadata.format_version, format_version) {
(FormatVersion::V1, FormatVersion::V1) => Ok(()),
(FormatVersion::V2, FormatVersion::V2) => Ok(()),
(FormatVersion::V1, FormatVersion::V2) => {
self.metadata.format_version = format_version;
Ok(())
}
(FormatVersion::V2, FormatVersion::V1) => Err(Error::new(
ErrorKind::DataInvalid,
"Cannot downgrade manifest format version from V2 to V1",
)),
}
}
}

/// A manifest writer.
Expand Down Expand Up @@ -1104,8 +1120,8 @@ mod _serde {
Ok(ManifestEntry {
status: self.status.try_into()?,
snapshot_id: Some(self.snapshot_id),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: self.data_file.try_into(partition_type, schema)?,
})
}
Expand Down Expand Up @@ -1620,8 +1636,8 @@ mod tests {
entries: vec![ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
Expand Down Expand Up @@ -1690,8 +1706,100 @@ mod tests {
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: None,
file_sequence_number: None,
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![(
1000,
Some(
Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String))
.unwrap()
),
"category".to_string()
)]
.into_iter()
),
record_count: 1,
file_size_in_bytes: 874,
column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::string("a")),
(3, Literal::string("x"))
]),
upper_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::string("a")),
(3, Literal::string("x"))
]),
key_metadata: vec![],
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
},
}
]
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);

let entry = test_manifest_read_write(manifest, writer).await;

assert_eq!(entry.partitions.len(), 1);
assert_eq!(entry.partitions[0].lower_bound, Some(Literal::string("x")));
assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x")));
}

#[tokio::test]
async fn test_write_manifest_v1_partition_as_v2() {
let mut manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"data",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
3,
"category",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
partition_spec: PartitionSpec {
spec_id: 0,
fields: vec![PartitionField {
name: "category".to_string(),
transform: Transform::Identity,
source_id: 3,
field_id: 1000,
}],
},
content: ManifestContentType::Data,
format_version: FormatVersion::V1,
},
entries: vec![
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
Expand Down Expand Up @@ -1731,6 +1839,7 @@ mod tests {
}
]
};
manifest.upgrade_version(FormatVersion::V2).unwrap();

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);

Expand Down
52 changes: 52 additions & 0 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,4 +1374,56 @@ mod test {

temp_dir.close().unwrap();
}

#[tokio::test]
async fn test_manifest_list_writer_v1_as_v2() {
let expected_manifest_list = ManifestList {
entries: vec![ManifestListEntry {
manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
manifest_length: 5806,
partition_spec_id: 1,
content: ManifestContentType::Data,
sequence_number: 0,
min_sequence_number: 0,
added_snapshot_id: 1646658105718557341,
added_data_files_count: Some(3),
existing_data_files_count: Some(0),
deleted_data_files_count: Some(0),
added_rows_count: Some(3),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}],
key_metadata: vec![],
}]
};

let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v1.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
writer
.add_manifest_entries(expected_manifest_list.entries.clone().into_iter())
.unwrap();
writer.close().await.unwrap();

let bs = fs::read(path).unwrap();
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&HashMap::from([(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
assert_eq!(manifest_list, expected_manifest_list);

temp_dir.close().unwrap();
}
}

0 comments on commit 11dbd92

Please sign in to comment.