From ea025df713a250742bf8961eee7ccf6d528869d3 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 25 Feb 2025 17:44:48 +0100 Subject: [PATCH] fix: refresh snapshot after vacuuming logs Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/transaction/mod.rs | 12 +- python/deltalake/_internal.pyi | 16 +- python/deltalake/writer.py | 49 ++-- python/src/lib.rs | 240 +++++++++++------- python/tests/test_checkpoint.py | 36 +++ python/tests/test_threaded.py | 18 +- 6 files changed, 258 insertions(+), 113 deletions(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 5afe41be81..031d8e4986 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -74,7 +74,6 @@ //! └───────────────────────────────┘ //! use std::collections::HashMap; -use std::future::Future; use std::sync::Arc; use bytes::Bytes; @@ -766,7 +765,7 @@ impl PostCommit { } else { snapshot.advance(vec![&self.data])?; } - let state = DeltaTableState { snapshot }; + let mut state = DeltaTableState { snapshot }; let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs { cleanup_logs @@ -809,6 +808,15 @@ impl PostCommit { Some(post_commit_operation_id), ) .await? as u64; + if num_log_files_cleaned_up > 0 { + state = DeltaTableState::try_new( + &state.snapshot().table_root(), + self.log_store.object_store(None), + state.load_config().clone(), + Some(self.version), + ) + .await?; + } } // Run arbitrary after_post_commit_hook code diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 025b5d47fc..1d9f362130 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -234,6 +234,21 @@ class RawDeltaTable: post_commithook_properties: Optional[PostCommitHookProperties], ) -> None: ... def __datafusion_table_provider__(self) -> Any: ... + def write( + self, + data: pyarrow.RecordBatchReader, + partition_by: Optional[List[str]], + mode: str, + schema_mode: Optional[str], + predicate: Optional[str], + target_file_size: Optional[int], + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + writer_properties: Optional[WriterProperties], + commit_properties: Optional[CommitProperties], + post_commithook_properties: Optional[PostCommitHookProperties], + ) -> None: ... def rust_core_version() -> str: ... def write_new_deltalake( @@ -253,7 +268,6 @@ def write_to_deltalake( data: pyarrow.RecordBatchReader, partition_by: Optional[List[str]], mode: str, - table: Optional[RawDeltaTable], schema_mode: Optional[str], predicate: Optional[str], target_file_size: Optional[int], diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index ba9c033c61..0767e99da9 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -320,25 +320,38 @@ def write_deltalake( conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH, ) data = RecordBatchReader.from_batches(schema, (batch for batch in data)) - write_deltalake_rust( - table_uri=table_uri, - data=data, - partition_by=partition_by, - mode=mode, - table=table._table if table is not None else None, - schema_mode=schema_mode, - predicate=predicate, - target_file_size=target_file_size, - name=name, - description=description, - configuration=configuration, - storage_options=storage_options, - writer_properties=writer_properties, - commit_properties=commit_properties, - post_commithook_properties=post_commithook_properties, - ) if table: - table.update_incremental() + table._table.write( + data=data, + partition_by=partition_by, + mode=mode, + schema_mode=schema_mode, + predicate=predicate, + target_file_size=target_file_size, + name=name, + description=description, + configuration=configuration, + writer_properties=writer_properties, + commit_properties=commit_properties, + post_commithook_properties=post_commithook_properties, + ) + else: + write_deltalake_rust( + table_uri=table_uri, + data=data, + partition_by=partition_by, + mode=mode, + schema_mode=schema_mode, + predicate=predicate, + target_file_size=target_file_size, + name=name, + description=description, + configuration=configuration, + storage_options=storage_options, + writer_properties=writer_properties, + commit_properties=commit_properties, + post_commithook_properties=post_commithook_properties, + ) elif engine == "pyarrow": warnings.warn( "pyarrow engine is deprecated and will be removed in v1.0", diff --git a/python/src/lib.rs b/python/src/lib.rs index 668c647d7a..bddeb3cca0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1402,7 +1402,7 @@ impl RawDeltaTable { } pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> { - py.allow_threads(|| { + let (_result, new_state) = py.allow_threads(|| { let operation_id = Uuid::new_v4(); let handle = Arc::new(LakeFSCustomExecuteHandler {}); let store = &self.log_store()?; @@ -1419,10 +1419,29 @@ impl RawDeltaTable { let result = rt().block_on(async { match self._table.lock() { - Ok(table) => cleanup_metadata(&table, Some(operation_id)) - .await - .map_err(PythonError::from) - .map_err(PyErr::from), + Ok(table) => { + let result = cleanup_metadata(&table, Some(operation_id)) + .await + .map_err(PythonError::from) + .map_err(PyErr::from)?; + + let new_state = if result > 0 { + Some( + DeltaTableState::try_new( + &table.state.clone().unwrap().snapshot().table_root(), + table.object_store(), + table.config.clone(), + Some(table.version()), + ) + .await + .map_err(PythonError::from)?, + ) + } else { + None + }; + + Ok((result, new_state)) + } Err(e) => Err(PyRuntimeError::new_err(e.to_string())), } }); @@ -1439,6 +1458,10 @@ impl RawDeltaTable { result })?; + if new_state.is_some() { + self.set_state(new_state)?; + } + Ok(()) } @@ -1610,6 +1633,97 @@ impl RawDeltaTable { Ok(()) } + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] + fn write( + &mut self, + py: Python, + data: PyArrowType, + mode: String, + schema_mode: Option, + partition_by: Option>, + predicate: Option, + target_file_size: Option, + name: Option, + description: Option, + configuration: Option>>, + writer_properties: Option, + commit_properties: Option, + post_commithook_properties: Option, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let save_mode = mode.parse().map_err(PythonError::from)?; + + let mut builder = WriteBuilder::new( + self.log_store()?, + self.with_table(|t| Ok(t.state.clone()))?, + // Take the Option since it might be the first write, + // triggered through `write_to_deltalake` + ) + .with_save_mode(save_mode); + + let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?; + + let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None) + .map_err(PythonError::from)? + .build() + .map_err(PythonError::from)?; + + builder = builder.with_input_execution_plan(Arc::new(plan)); + + if let Some(schema_mode) = schema_mode { + builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?); + } + if let Some(partition_columns) = partition_by { + builder = builder.with_partition_columns(partition_columns); + } + + if let Some(writer_props) = writer_properties { + builder = builder.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_description(description); + }; + + if let Some(predicate) = predicate { + builder = builder.with_replace_where(predicate); + }; + + if let Some(target_file_size) = target_file_size { + builder = builder.with_target_file_size(target_file_size) + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { + builder = builder.with_commit_properties(commit_properties); + }; + + if self.log_store()?.name() == "LakeFSLogStore" { + builder = + builder.with_custom_execute_handler(Arc::new(LakeFSCustomExecuteHandler {})) + } + + rt().block_on(builder.into_future()) + .map_err(PythonError::from) + .map_err(PyErr::from) + })?; + + self.set_state(table.state)?; + Ok(()) + } + fn __datafusion_table_provider__<'py>( &self, py: Python<'py>, @@ -2137,13 +2251,12 @@ pub struct PyCommitProperties { #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = (table_uri, data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] fn write_to_deltalake( py: Python, table_uri: String, data: PyArrowType, mode: String, - table: Option<&RawDeltaTable>, schema_mode: Option, partition_by: Option>, predicate: Option, @@ -2156,92 +2269,39 @@ fn write_to_deltalake( commit_properties: Option, post_commithook_properties: Option, ) -> PyResult<()> { - py.allow_threads(|| { - let save_mode = mode.parse().map_err(PythonError::from)?; - + let raw_table: DeltaResult = py.allow_threads(|| { let options = storage_options.clone().unwrap_or_default(); - let table = if let Some(table) = table { - table.with_table(|t| Ok(DeltaOps::from(t.clone())))? - } else { - rt().block_on(DeltaOps::try_from_uri_with_storage_options( + let table = rt() + .block_on(DeltaOps::try_from_uri_with_storage_options( &table_uri, options, - )) - .map_err(PythonError::from)? + ))? + .0; + + let raw_table = RawDeltaTable { + _table: Arc::new(Mutex::new(table)), + _config: FsConfig { + root_url: table_uri, + options: storage_options.unwrap_or_default(), + }, }; - - // let dont_be_so_lazy = match table.0.state.as_ref() { - // Some(state) => state.table_config().enable_change_data_feed() && predicate.is_some(), - // // You don't have state somehow, so I guess it's okay to be lazy. - // _ => false, - // }; - - let mut builder = - WriteBuilder::new(table.0.log_store(), table.0.state).with_save_mode(save_mode); - - // if dont_be_so_lazy { - // debug!( - // "write_to_deltalake() is not able to lazily perform a write, collecting batches" - // ); - // builder = builder.with_input_batches(data.0.map(|batch| batch.unwrap())); - // } else { - - let table_provider = to_lazy_table(data.0).map_err(PythonError::from)?; - - let plan = LogicalPlanBuilder::scan("source", provider_as_source(table_provider), None) - .map_err(PythonError::from)? - .build() - .map_err(PythonError::from)?; - builder = builder.with_input_execution_plan(Arc::new(plan)); - // } - - if let Some(schema_mode) = schema_mode { - builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?); - } - if let Some(partition_columns) = partition_by { - builder = builder.with_partition_columns(partition_columns); - } - - if let Some(writer_props) = writer_properties { - builder = builder.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } - - if let Some(name) = &name { - builder = builder.with_table_name(name); - }; - - if let Some(description) = &description { - builder = builder.with_description(description); - }; - - if let Some(predicate) = predicate { - builder = builder.with_replace_where(predicate); - }; - - if let Some(target_file_size) = target_file_size { - builder = builder.with_target_file_size(target_file_size) - }; - - if let Some(config) = configuration { - builder = builder.with_configuration(config); - }; - - if let Some(commit_properties) = - maybe_create_commit_properties(commit_properties, post_commithook_properties) - { - builder = builder.with_commit_properties(commit_properties); - }; - - if table_uri.starts_with("lakefs://") { - builder = builder.with_custom_execute_handler(Arc::new(LakeFSCustomExecuteHandler {})) - } - - rt().block_on(builder.into_future()) - .map_err(PythonError::from)?; - - Ok(()) - }) + Ok(raw_table) + }); + + raw_table.map_err(PythonError::from)?.write( + py, + data, + mode, + schema_mode, + partition_by, + predicate, + target_file_size, + name, + description, + configuration, + writer_properties, + commit_properties, + post_commithook_properties, + ) } #[pyfunction] diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 2bbc04fac5..d3f8a61d62 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -539,3 +539,39 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path): new_df = dt.to_pandas() print(dt.to_pandas()) assert len(new_df) == 1, "We overwrote! there should only be one row" + + +@pytest.mark.polars +def test_refresh_snapshot_after_log_cleanup_3057(tmp_path): + """https://github.com/delta-io/delta-rs/issues/3057""" + import polars as pl + + configuration = { + "delta.deletedFileRetentionDuration": "interval 0 days", + "delta.logRetentionDuration": "interval 0 days", + "delta.targetFileSize": str(128 * 1024 * 1024), + } + + for i in range(2): + df = pl.DataFrame({"foo": [i]}) + df.write_delta( + str(tmp_path), + delta_write_options={"configuration": configuration}, + mode="append", + ) + + # create checkpoint so that logs before checkpoint can get removed + dt = DeltaTable(tmp_path) + dt.create_checkpoint() + + # Write to table again, snapshot should be correctly refreshed so that clean_up metadata can run after this + df = pl.DataFrame({"foo": [1]}) + df.write_delta(dt, mode="append") + + # Vacuum is noop, since we already removed logs before and snapshot doesn't reference them anymore + + vacuum_log = dt.vacuum( + retention_hours=0, enforce_retention_duration=False, dry_run=False + ) + + assert vacuum_log == [] diff --git a/python/tests/test_threaded.py b/python/tests/test_threaded.py index b9a4a97908..9839e79713 100644 --- a/python/tests/test_threaded.py +++ b/python/tests/test_threaded.py @@ -47,7 +47,7 @@ def comp(): @pytest.mark.polars -def test_multithreaded_write(sample_data: pa.Table, tmp_path: pathlib.Path): +def test_multithreaded_write_using_table(sample_data: pa.Table, tmp_path: pathlib.Path): import polars as pl table = pl.DataFrame({"a": [1, 2, 3]}).to_arrow() @@ -55,5 +55,19 @@ def test_multithreaded_write(sample_data: pa.Table, tmp_path: pathlib.Path): dt = DeltaTable(tmp_path) + with pytest.raises(RuntimeError, match="borrowed"): + with ThreadPoolExecutor() as exe: + list(exe.map(lambda _: write_deltalake(dt, table, mode="append"), range(5))) + + +@pytest.mark.polars +def test_multithreaded_write_using_path(sample_data: pa.Table, tmp_path: pathlib.Path): + import polars as pl + + table = pl.DataFrame({"a": [1, 2, 3]}).to_arrow() + write_deltalake(tmp_path, table, mode="overwrite") + with ThreadPoolExecutor() as exe: - list(exe.map(lambda _: write_deltalake(dt, table, mode="append"), range(5))) + list( + exe.map(lambda _: write_deltalake(tmp_path, table, mode="append"), range(5)) + )