-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fix parse partitions in manifest_list #122
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,18 +57,18 @@ impl ManifestList { | |
pub fn parse_with_version( | ||
bs: &[u8], | ||
version: FormatVersion, | ||
partition_type: &StructType, | ||
partition_types: &HashMap<i32, StructType>, | ||
) -> Result<ManifestList, Error> { | ||
match version { | ||
FormatVersion::V1 => { | ||
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?; | ||
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?); | ||
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type) | ||
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_types) | ||
} | ||
FormatVersion::V2 => { | ||
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?; | ||
let values = Value::Array(reader.collect::<Result<Vec<Value>, _>>()?); | ||
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type) | ||
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_types) | ||
} | ||
} | ||
} | ||
|
@@ -657,6 +657,8 @@ pub struct FieldSummary { | |
/// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. | ||
/// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. | ||
pub(super) mod _serde { | ||
use std::collections::HashMap; | ||
|
||
pub use serde_bytes::ByteBuf; | ||
use serde_derive::{Deserialize, Serialize}; | ||
|
||
|
@@ -682,12 +684,26 @@ pub(super) mod _serde { | |
impl ManifestListV2 { | ||
/// Converts the [ManifestListV2] into a [ManifestList]. | ||
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. | ||
pub fn try_into(self, partition_type: &StructType) -> Result<super::ManifestList, Error> { | ||
pub fn try_into( | ||
self, | ||
partition_types: &HashMap<i32, StructType>, | ||
) -> Result<super::ManifestList, Error> { | ||
Ok(super::ManifestList { | ||
entries: self | ||
.entries | ||
.into_iter() | ||
.map(|v| v.try_into(partition_type)) | ||
.map(|v| { | ||
let partition_spec_id = v.partition_spec_id; | ||
let manifest_path = v.manifest_path.clone(); | ||
v.try_into(partition_types.get(&partition_spec_id)) | ||
.map_err(|err| { | ||
err.with_context("manifest file path", manifest_path) | ||
.with_context( | ||
"partition spec id", | ||
partition_spec_id.to_string(), | ||
) | ||
}) | ||
}) | ||
.collect::<Result<Vec<_>, _>>()?, | ||
}) | ||
} | ||
|
@@ -710,12 +726,26 @@ pub(super) mod _serde { | |
impl ManifestListV1 { | ||
/// Converts the [ManifestListV1] into a [ManifestList]. | ||
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. | ||
pub fn try_into(self, partition_type: &StructType) -> Result<super::ManifestList, Error> { | ||
pub fn try_into( | ||
self, | ||
partition_types: &HashMap<i32, StructType>, | ||
) -> Result<super::ManifestList, Error> { | ||
Ok(super::ManifestList { | ||
entries: self | ||
.entries | ||
.into_iter() | ||
.map(|v| v.try_into(partition_type)) | ||
.map(|v| { | ||
let partition_spec_id = v.partition_spec_id; | ||
let manifest_path = v.manifest_path.clone(); | ||
v.try_into(partition_types.get(&partition_spec_id)) | ||
.map_err(|err| { | ||
err.with_context("manifest file path", manifest_path) | ||
.with_context( | ||
"partition spec id", | ||
partition_spec_id.to_string(), | ||
) | ||
}) | ||
}) | ||
.collect::<Result<Vec<_>, _>>()?, | ||
}) | ||
} | ||
|
@@ -800,10 +830,10 @@ pub(super) mod _serde { | |
|
||
fn try_convert_to_field_summary( | ||
partitions: Option<Vec<FieldSummary>>, | ||
partition_type: &StructType, | ||
partition_type: Option<&StructType>, | ||
) -> Result<Vec<super::FieldSummary>, Error> { | ||
Ok(partitions | ||
.map(|partitions| { | ||
if let Some(partitions) = partitions { | ||
if let Some(partition_type) = partition_type { | ||
let partition_types = partition_type.fields(); | ||
if partitions.len() != partition_types.len() { | ||
return Err(Error::new( | ||
|
@@ -820,15 +850,24 @@ pub(super) mod _serde { | |
.zip(partition_types) | ||
.map(|(v, field)| v.try_into(&field.field_type)) | ||
.collect::<Result<Vec<_>, _>>() | ||
}) | ||
.transpose()? | ||
.unwrap_or_default()) | ||
} else { | ||
Err(Error::new( | ||
crate::ErrorKind::DataInvalid, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm expecting a better error message with more context, for example, the manifest list file path, the entry path, spec id. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have added context of entry path and spec id. We don't have the manifest list file path for now (we pass [u8] to parse directly), I think it need to set in the higher level API. |
||
"Invalid partition spec. Partition type is required", | ||
)) | ||
} | ||
} else { | ||
Ok(Vec::new()) | ||
} | ||
} | ||
|
||
impl ManifestListEntryV2 { | ||
/// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. | ||
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. | ||
pub fn try_into(self, partition_type: &StructType) -> Result<ManifestListEntry, Error> { | ||
pub fn try_into( | ||
self, | ||
partition_type: Option<&StructType>, | ||
) -> Result<ManifestListEntry, Error> { | ||
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; | ||
Ok(ManifestListEntry { | ||
manifest_path: self.manifest_path, | ||
|
@@ -853,7 +892,10 @@ pub(super) mod _serde { | |
impl ManifestListEntryV1 { | ||
/// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. | ||
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. | ||
pub fn try_into(self, partition_type: &StructType) -> Result<ManifestListEntry, Error> { | ||
pub fn try_into( | ||
self, | ||
partition_type: Option<&StructType>, | ||
) -> Result<ManifestListEntry, Error> { | ||
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; | ||
Ok(ManifestListEntry { | ||
manifest_path: self.manifest_path, | ||
|
@@ -1032,7 +1074,7 @@ pub(super) mod _serde { | |
|
||
#[cfg(test)] | ||
mod test { | ||
use std::{fs, sync::Arc}; | ||
use std::{collections::HashMap, fs, sync::Arc}; | ||
|
||
use tempfile::TempDir; | ||
|
||
|
@@ -1090,12 +1132,9 @@ mod test { | |
|
||
let bs = fs::read(full_path).expect("read_file must succeed"); | ||
|
||
let parsed_manifest_list = ManifestList::parse_with_version( | ||
&bs, | ||
crate::spec::FormatVersion::V1, | ||
&StructType::new(vec![]), | ||
) | ||
.unwrap(); | ||
let parsed_manifest_list = | ||
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, &HashMap::new()) | ||
.unwrap(); | ||
|
||
assert_eq!(manifest_list, parsed_manifest_list); | ||
} | ||
|
@@ -1120,6 +1159,23 @@ mod test { | |
deleted_rows_count: Some(0), | ||
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], | ||
key_metadata: vec![], | ||
}, | ||
ManifestListEntry { | ||
manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(), | ||
manifest_length: 6926, | ||
partition_spec_id: 2, | ||
content: ManifestContentType::Data, | ||
sequence_number: 1, | ||
min_sequence_number: 1, | ||
added_snapshot_id: 377075049360453639, | ||
added_data_files_count: Some(1), | ||
existing_data_files_count: Some(0), | ||
deleted_data_files_count: Some(0), | ||
added_rows_count: Some(3), | ||
existing_rows_count: Some(0), | ||
deleted_rows_count: Some(0), | ||
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::float(1.1)), upper_bound: Some(Literal::float(2.1))}], | ||
key_metadata: vec![], | ||
} | ||
] | ||
}; | ||
|
@@ -1147,11 +1203,24 @@ mod test { | |
let parsed_manifest_list = ManifestList::parse_with_version( | ||
&bs, | ||
crate::spec::FormatVersion::V2, | ||
&StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
&HashMap::from([ | ||
( | ||
1, | ||
StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
), | ||
( | ||
2, | ||
StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Float), | ||
))]), | ||
), | ||
]), | ||
) | ||
.unwrap(); | ||
|
||
|
@@ -1251,11 +1320,14 @@ mod test { | |
let manifest_list = ManifestList::parse_with_version( | ||
&bs, | ||
crate::spec::FormatVersion::V1, | ||
&StructType::new(vec![Arc::new(NestedField::required( | ||
&HashMap::from([( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
)]), | ||
) | ||
.unwrap(); | ||
assert_eq!(manifest_list, expected_manifest_list); | ||
|
@@ -1302,11 +1374,14 @@ mod test { | |
let manifest_list = ManifestList::parse_with_version( | ||
&bs, | ||
crate::spec::FormatVersion::V2, | ||
&StructType::new(vec![Arc::new(NestedField::required( | ||
&HashMap::from([( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
)]), | ||
) | ||
.unwrap(); | ||
expected_manifest_list.entries[0].sequence_number = seq_num; | ||
|
@@ -1315,4 +1390,56 @@ mod test { | |
|
||
temp_dir.close().unwrap(); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_manifest_list_writer_v1_as_v2() { | ||
let expected_manifest_list = ManifestList { | ||
entries: vec![ManifestListEntry { | ||
manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(), | ||
manifest_length: 5806, | ||
partition_spec_id: 1, | ||
content: ManifestContentType::Data, | ||
sequence_number: 0, | ||
min_sequence_number: 0, | ||
added_snapshot_id: 1646658105718557341, | ||
added_data_files_count: Some(3), | ||
existing_data_files_count: Some(0), | ||
deleted_data_files_count: Some(0), | ||
added_rows_count: Some(3), | ||
existing_rows_count: Some(0), | ||
deleted_rows_count: Some(0), | ||
partitions: vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Literal::long(1)), upper_bound: Some(Literal::long(1))}], | ||
key_metadata: vec![], | ||
}] | ||
}; | ||
|
||
let temp_dir = TempDir::new().unwrap(); | ||
let path = temp_dir.path().join("manifest_list_v1.avro"); | ||
let io = FileIOBuilder::new_fs_io().build().unwrap(); | ||
let output_file = io.new_output(path.to_str().unwrap()).unwrap(); | ||
|
||
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1); | ||
writer | ||
.add_manifest_entries(expected_manifest_list.entries.clone().into_iter()) | ||
.unwrap(); | ||
writer.close().await.unwrap(); | ||
|
||
let bs = fs::read(path).unwrap(); | ||
let manifest_list = ManifestList::parse_with_version( | ||
&bs, | ||
crate::spec::FormatVersion::V2, | ||
&HashMap::from([( | ||
1, | ||
StructType::new(vec![Arc::new(NestedField::required( | ||
1, | ||
"test", | ||
Type::Primitive(PrimitiveType::Long), | ||
))]), | ||
)]), | ||
) | ||
.unwrap(); | ||
assert_eq!(manifest_list, expected_manifest_list); | ||
|
||
temp_dir.close().unwrap(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct 👍