From f8bc074d82c3ed56ffef1c1df728af6258e0797f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 27 Nov 2024 12:49:50 -0800 Subject: [PATCH] new Snapshot::new_from() API --- kernel/src/snapshot.rs | 42 ++++++++++++++++++++++++++++++++- kernel/src/table_changes/mod.rs | 2 +- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 30063da3e..852bba40f 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -53,7 +53,8 @@ impl Snapshot { /// /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. - /// - `version`: target version of the [`Snapshot`] + /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest + /// version of the table. pub fn try_new( table_root: Url, engine: &dyn Engine, @@ -71,6 +72,26 @@ impl Snapshot { Self::try_new_from_log_segment(table_root, log_segment, engine) } + /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you + /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the + /// snapshot to a later version. + /// + /// # Parameters + /// + /// - `existing_snapshot`: reference to an existing [`Snapshot`] + /// - `engine`: Implementation of [`Engine`] apis. + /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest + /// version of the table. + pub fn new_from( + existing_snapshot: &Snapshot, + engine: &dyn Engine, + version: Option, + ) -> DeltaResult { + // TODO(zach): for now we just pass through to the old API. We should instead optimize this + // to avoid replaying overlapping LogSegments. + Self::try_new(existing_snapshot.table_root.clone(), engine, version) + } + /// Create a new [`Snapshot`] instance. pub(crate) fn try_new_from_log_segment( location: Url, @@ -250,6 +271,25 @@ mod tests { assert_eq!(snapshot.schema(), &expected); } + #[test] + fn test_snapshot_new_from() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + + let engine = SyncEngine::new(); + let old_snapshot = Snapshot::try_new(url, &engine, Some(0)).unwrap(); + let snapshot = Snapshot::new_from(&old_snapshot, &engine, Some(0)).unwrap(); + + let expected = + Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); + assert_eq!(snapshot.protocol(), &expected); + + let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#; + let expected: StructType = serde_json::from_str(schema_string).unwrap(); + assert_eq!(snapshot.schema(), &expected); + } + #[test] fn test_read_table_with_last_checkpoint() { let path = std::fs::canonicalize(PathBuf::from( diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 2005762f9..3cfa6c726 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -90,7 +90,7 @@ impl TableChanges { // supported for every protocol action in the CDF range. let start_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; - let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; + let end_snapshot = Snapshot::new_from(&start_snapshot, engine, end_version)?; // Verify CDF is enabled at the beginning and end of the interval to fail early. We must // still check that CDF is enabled for every metadata action in the CDF range.