Skip to content

Commit

Permalink
fix: refresh snapshot after vacuuming logs
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 25, 2025
1 parent 666179e commit cede5a8
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 114 deletions.
12 changes: 10 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
//! └───────────────────────────────┘
//!</pre>
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -766,7 +765,7 @@ impl PostCommit {
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState { snapshot };
let mut state = DeltaTableState { snapshot };

let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
cleanup_logs
Expand Down Expand Up @@ -809,6 +808,15 @@ impl PostCommit {
Some(post_commit_operation_id),
)
.await? as u64;
if num_log_files_cleaned_up > 0 {
state = DeltaTableState::try_new(
&state.snapshot().table_root(),
self.log_store.object_store(None),
state.load_config().clone(),
Some(self.version),
)
.await?;
}
}

// Run arbitrary after_post_commit_hook code
Expand Down
16 changes: 15 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ class RawDeltaTable:
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def __datafusion_table_provider__(self) -> Any: ...
def write(
self,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
commit_properties: Optional[CommitProperties],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...

def rust_core_version() -> str: ...
def write_new_deltalake(
Expand All @@ -253,7 +268,6 @@ def write_to_deltalake(
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
Expand Down
49 changes: 31 additions & 18 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,38 @@ def write_deltalake(
conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH,
)
data = RecordBatchReader.from_batches(schema, (batch for batch in data))
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
table._table.write(
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
else:
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
elif engine == "pyarrow":
warnings.warn(
"pyarrow engine is deprecated and will be removed in v1.0",
Expand Down
Loading

0 comments on commit cede5a8

Please sign in to comment.