Skip to content

Commit

Permalink
1. add method for data file
Browse files Browse the repository at this point in the history
2. mark add entry method as crate private
  • Loading branch information
ZENOTME committed Jan 17, 2025
1 parent b39d7db commit b701bd9
Showing 1 changed file with 78 additions and 12 deletions.
90 changes: 78 additions & 12 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,26 +304,26 @@ impl ManifestWriter {
Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
}

fn check_entry(&self, entry: &ManifestEntry) -> Result<()> {
fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
match self.metadata.content {
ManifestContentType::Data => {
if entry.data_file.content != DataContentType::Data {
if data_file.content != DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Content type of entry {:?} should have DataContentType::Data",
entry.data_file.content
data_file.content
),
));
}
}
ManifestContentType::Deletes => {
if entry.data_file.content != DataContentType::EqualityDeletes
&& entry.data_file.content != DataContentType::PositionDeletes
if data_file.content != DataContentType::EqualityDeletes
&& data_file.content != DataContentType::PositionDeletes
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", entry.data_file.content),
format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content),
));
}
}
Expand All @@ -336,8 +336,8 @@ impl ManifestWriter {
/// - Set the snapshot id to the current snapshot id
/// - Set the sequence number to `None` if it is invalid(smaller than 0)
/// - Set the file sequence number to `None`
pub fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_entry(&entry)?;
pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
if entry.sequence_number().is_some_and(|n| n >= 0) {
entry.status = ManifestStatus::Added;
entry.snapshot_id = Some(self.snapshot_id);
Expand All @@ -352,26 +352,92 @@ impl ManifestWriter {
Ok(())
}

/// Add an added entry for a file with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
/// number will be the provided data sequence number. The entry's file sequence number will be
/// assigned at commit.
pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(self.snapshot_id),
sequence_number: (sequence_number >= 0).then_some(sequence_number),
file_sequence_number: None,
data_file,
};
self.add_entry(entry)?;
Ok(())
}

/// Add a delete manifest entry. This method will update following status of the entry:
/// - Update the entry status to `Deleted`
/// - Set the snapshot id to the current snapshot id
pub fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_entry(&entry)?;
///
/// # TODO
/// Remove this allow later
#[allow(dead_code)]
pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Deleted;
entry.snapshot_id = Some(self.snapshot_id);
self.add_entry(entry)?;
Ok(())
}

/// Add a delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
/// However, the original data and file sequence numbers of the file must be preserved when
/// the file is marked as deleted.
pub fn delete_file(
&mut self,
data_file: DataFile,
sequence_number: i64,
file_sequence_number: i64,
) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Deleted,
snapshot_id: Some(self.snapshot_id),
sequence_number: Some(sequence_number),
file_sequence_number: Some(file_sequence_number),
data_file,
};
self.add_entry(entry)?;
Ok(())
}

/// Add an existing manifest entry. This method will update following status of the entry:
/// - Update the entry status to `Existing`
pub fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_entry(&entry)?;
///
/// # TODO
/// Remove this allow later
#[allow(dead_code)]
pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Existing;
self.add_entry(entry)?;
Ok(())
}

/// Add an existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// which were assigned at commit, must be preserved when adding an existing entry.
pub fn existing_file(
&mut self,
data_file: DataFile,
snapshot_id: i64,
sequence_number: i64,
file_sequence_number: i64,
) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Existing,
snapshot_id: Some(snapshot_id),
sequence_number: Some(sequence_number),
file_sequence_number: Some(file_sequence_number),
data_file,
};
self.add_entry(entry)?;
Ok(())
}

fn add_entry(&mut self, entry: ManifestEntry) -> Result<()> {
// Check if the entry has sequence number
if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
Expand Down

0 comments on commit b701bd9

Please sign in to comment.