diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index e390ecddcc..239da4d0c8 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -51,7 +51,7 @@ use lance_io::object_store::ObjectStore; use lance_table::{ format::{ pb::{self, IndexMetadata}, - DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, + DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, }, io::{ commit::CommitHandler, @@ -136,6 +136,25 @@ pub enum Operation { /// Indices that have been updated with the new row addresses rewritten_indices: Vec, }, + /// Replace data in a column in the dataset with a new data. This is used for + /// null column population where we replace an entirely null column with a + /// new column that has data. + /// + /// This operation will only allow replacing files that contains the same schema + /// e.g. if the original files contains column A, B, C and the new files contains + /// only column A, B then the operation is not allowed. As we would need to split + /// the original files into two files, one with column A, B and the other with column C. + /// + /// Corollary to the above: the operation will also not allow replacing files layouts + /// that are not uniform across all fragments. + /// e.g. if fragments being replaced contains files with different schema layouts on + /// the column being replaced, the operation is not allowed. + /// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A + /// with a new column A the operation is not allowed. + DataReplacement { + old_fragments: Vec, + new_datafiles: Vec, + }, /// Merge a new column in Merge { fragments: Vec, @@ -229,6 +248,9 @@ impl Operation { .map(|f| f.id) .chain(removed_fragment_ids.iter().copied()), ), + Self::DataReplacement { old_fragments, .. } => { + Box::new(old_fragments.iter().map(|f| f.id)) + } } } @@ -332,6 +354,7 @@ impl Operation { Self::Update { .. } => "Update", Self::Project { .. } => "Project", Self::UpdateConfig { .. } => "UpdateConfig", + Self::DataReplacement { .. } => "DataReplacement", } } } @@ -370,6 +393,7 @@ impl Transaction { Operation::ReserveFragments { .. } => false, Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Rewrite { .. } => match &other.operation { @@ -411,6 +435,7 @@ impl Transaction { // if the rewrite changed more than X% of row ids. Operation::Rewrite { .. } => true, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { @@ -744,6 +769,90 @@ impl Transaction { Operation::Restore { .. } => { unreachable!() } + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => { + // 0. check we have the same number of old fragments as new data files + if old_fragments.len() != new_datafiles.len() { + return Err(Error::invalid_input( + "Number of old fragments must match number of new data files", + location!(), + )); + } + + // 1. make sure the new files all have the same fields + if new_datafiles + .iter() + .map(|f| f.fields.clone()) + .collect::>() + .len() + != 1 + { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "All new data files must have the same fields", + location!(), + )); + } + + // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced + // 3. add modified fragments to final_fragments + for (frag, new_file) in old_fragments.into_iter().zip(new_datafiles) { + let mut new_frag = frag.clone(); + + // TODO: check new file and fragment are the same length + + let mut columns_covered = HashSet::new(); + for file in &mut new_frag.files { + if file.fields == new_file.fields + && file.file_major_version == new_file.file_major_version + && file.file_minor_version == new_file.file_minor_version + { + // assign the new file path to the fragment + file.path = new_file.path.clone(); + } + columns_covered.extend(file.fields.iter()); + } + // SPECIAL CASE: if the column(s) being replaced are not covered by the fragment + // Then it means it's a all-NULL column that is being replaced with real data + // just add it to the final fragments + if columns_covered.is_disjoint(&new_file.fields.iter().collect()) { + new_frag.add_file( + new_file.path.clone(), + new_file.fields.clone(), + new_file.column_indices.clone(), + &LanceFileVersion::try_from_major_minor( + new_file.file_major_version, + new_file.file_minor_version, + ) + .expect("Expected valid file version"), + ); + } + + // Nothing changed in the current fragment, which is not expected -- error out + if &new_frag == frag { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "Expected to modify the fragment but no changes were made", + location!(), + )); + } + final_fragments.push(new_frag); + } + + let fragments_changed = + final_fragments.iter().map(|f| f.id).collect::>(); + + // 4. push fragments that didn't change back to final_fragments + let unmodified_fragments = maybe_existing_fragments? + .iter() + .filter(|f| !fragments_changed.contains(&f.id)) + .cloned() + .collect::>(); + + final_fragments.extend(unmodified_fragments); + } }; // If a fragment was reserved then it may not belong at the end of the fragments list.