Skip to content

Commit

Permalink
Merge branch 'main' into fix/pr-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored Feb 25, 2025
2 parents f19b4b3 + 4be81fb commit dbcc3fb
Show file tree
Hide file tree
Showing 18 changed files with 425 additions and 185 deletions.
1 change: 1 addition & 0 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl AWSForObjectStore {
}

/// Return true if a credential has been cached
#[cfg(test)]
async fn has_cached_credentials(&self) -> bool {
let guard = self.cache.lock().await;
(*guard).is_some()
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/kernel/models/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ static REMOVE_FIELD: LazyLock<StructField> = LazyLock::new(|| {
true,
)
});
// TODO implement support for this checkpoint
#[expect(dead_code)]
static REMOVE_FIELD_CHECKPOINT: LazyLock<StructField> = LazyLock::new(|| {
StructField::new(
"remove",
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl ScalarExt for Scalar {
},
Self::Binary(val) => create_escaped_binary_string(val.as_slice()),
Self::Null(_) => "null".to_string(),
Self::Struct(_) => unimplemented!(),
Self::Array(_) => unimplemented!(),
Self::Struct(_) => self.to_string(),
Self::Array(_) => self.to_string(),
}
}

Expand All @@ -85,7 +85,7 @@ impl ScalarExt for Scalar {
encode(Path::from(self.serialize()).as_ref()).to_string()
}

/// Create a [`Scalar`] form a row in an arrow array.
/// Create a [`Scalar`] from a row in an arrow array.
fn from_array(arr: &dyn Array, index: usize) -> Option<Self> {
use arrow_array::*;
use arrow_schema::DataType::*;
Expand Down
36 changes: 35 additions & 1 deletion crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use percent_encoding::percent_decode_str;
use super::super::scalars::ScalarExt;
use crate::kernel::arrow::extract::{extract_and_cast, extract_and_cast_opt};
use crate::kernel::{
DataType, DeletionVectorDescriptor, Metadata, Remove, StructField, StructType,
Add, DataType, DeletionVectorDescriptor, Metadata, Remove, StructField, StructType,
};
use crate::{DeltaResult, DeltaTableError};

Expand Down Expand Up @@ -284,6 +284,40 @@ impl LogicalFile<'_> {
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
}

pub fn add_action(&self) -> Add {
Add {
path: self.path().to_string(),
partition_values: self
.partition_values()
.ok()
.map(|pv| {
pv.iter()
.map(|(k, v)| {
(
k.to_string(),
if v.is_null() {
None
} else {
Some(v.serialize())
},
)
})
.collect()
})
.unwrap_or_default(),
size: self.size(),
modification_time: self.modification_time(),
data_change: true,
stats: Scalar::from_array(self.stats as &dyn Array, self.index).map(|s| s.serialize()),
tags: None,
deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()),
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
stats_parsed: None,
}
}

/// Create a remove action for this logical file.
pub fn remove_action(&self, data_change: bool) -> Remove {
Remove {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ where
}

// Custom deserialization that parses a JSON string into MetricDetails
#[expect(dead_code)]
fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ pub(crate) mod tests {
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::Schema;
use chrono::NaiveDateTime;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::assert_batches_sorted_eq;
use itertools::Itertools;
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,6 @@ mod tests {
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field;
use datafusion::assert_batches_sorted_eq;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::Column;
use datafusion_common::TableReference;
Expand Down
108 changes: 50 additions & 58 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::Add;
use crate::kernel::{scalars::ScalarExt, Action, PartitionsExt, Remove};
use crate::logstore::LogStoreRef;
use crate::operations::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES};
Expand All @@ -55,6 +56,8 @@ use crate::table::state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};

#[cfg(feature = "datafusion")]
use crate::delta_datafusion::DeltaTableProvider;
/// Metrics from Optimize
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -484,7 +487,7 @@ impl MergePlan {
.iter()
.map(|file_meta| {
create_remove(
file_meta.location.as_ref(),
file_meta.path.as_ref(),
&partition_values,
file_meta.size as i64,
)
Expand Down Expand Up @@ -626,44 +629,15 @@ impl MergePlan {
async fn read_zorder(
files: MergeBin,
context: Arc<zorder::ZOrderExecContext>,
table_provider: DeltaTableProvider,
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ScalarUDF};

// This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then
// parse as URLs and then pass back to the object store (x_x). This can cause problems when
// paths in object storage have special characters like spaces, etc.
//
// This [str::replace] i kind of a hack to address
// <https://github.com/delta-io/delta-rs/issues/2834 >
let locations: Vec<String> = files
.iter()
.map(|om| {
format!(
"delta-rs:///{}",
str::replace(om.location.as_ref(), "%", "%25")
)
})
.collect();
debug!("Reading z-order with locations are: {locations:?}");

let df = context
.ctx
// TODO: should read options have the partition columns
.read_parquet(locations, ParquetReadOptions::default())
.await?;

let original_columns = df
.schema()
.fields()
.iter()
.map(|f| Expr::Column(Column::from_qualified_name_ignore_case(f.name())))
.collect_vec();
use datafusion_expr::{col, Expr, ScalarUDF};

let provider = table_provider.with_files(files.files);
let df = context.ctx.read_table(Arc::new(provider))?;

// Add a temporary z-order column we will sort by, and then drop.
const ZORDER_KEY_COLUMN: &str = "__zorder_key";
let cols = context
.columns
.iter()
Expand All @@ -673,10 +647,7 @@ impl MergePlan {
Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
cols,
));
let df = df.with_column(ZORDER_KEY_COLUMN, expr)?;

let df = df.sort(vec![col(ZORDER_KEY_COLUMN).sort(true, true)])?;
let df = df.select(original_columns)?;
let df = df.sort(vec![expr.sort(true, true)])?;

let stream = df
.execute_stream()
Expand Down Expand Up @@ -717,14 +688,17 @@ impl MergePlan {
partition,
);
for file in files.iter() {
debug!(" file {}", file.location);
debug!(" file {}", file.path);
}
let object_store_ref = log_store.object_store(Some(operation_id));
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref, file);
let file_reader = ParquetObjectReader::new(
object_store_ref,
ObjectMeta::try_from(file).unwrap(),
);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
Expand Down Expand Up @@ -765,10 +739,30 @@ impl MergePlan {
)?);
let task_parameters = self.task_parameters.clone();

use crate::delta_datafusion::DataFusionMixins;
use crate::delta_datafusion::DeltaScanConfigBuilder;
use crate::delta_datafusion::DeltaTableProvider;

let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(false)
.with_schema(snapshot.input_schema()?)
.build(&snapshot)?;

// For each rewrite evaluate the predicate and then modify each expression
// to either compute the new value or obtain the old one then write these batches
let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(_, (partition, files))| {
let batch_stream = Self::read_zorder(files.clone(), exec_context.clone());
let batch_stream = Self::read_zorder(
files.clone(),
exec_context.clone(),
DeltaTableProvider::try_new(
snapshot.clone(),
log_store.clone(),
scan_config.clone(),
)
.unwrap(),
);
let rewrite_result = tokio::task::spawn(Self::rewrite_files(
task_parameters.clone(),
partition,
Expand Down Expand Up @@ -797,6 +791,7 @@ impl MergePlan {

let mut last_commit = Instant::now();
let mut commits_made = 0;
let mut snapshot = snapshot.clone();
loop {
let next = stream.next().await.transpose()?;

Expand Down Expand Up @@ -836,18 +831,18 @@ impl MergePlan {

debug!("committing {} actions", actions.len());

CommitBuilder::from(properties)
let commit = CommitBuilder::from(properties)
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handle.cloned())
.with_max_retries(DEFAULT_RETRIES + commits_made)
.build(
Some(snapshot),
Some(&snapshot),
log_store.clone(),
self.task_parameters.input_parameters.clone().into(),
)
.await?;

snapshot = commit.snapshot();
commits_made += 1;
}

Expand All @@ -864,7 +859,7 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}

table.update().await?;
table.state = Some(snapshot);

Ok(total_metrics)
}
Expand Down Expand Up @@ -915,7 +910,7 @@ pub fn create_merge_plan(
/// A collection of bins for a particular partition
#[derive(Debug, Clone)]
struct MergeBin {
files: Vec<ObjectMeta>,
files: Vec<Add>,
size_bytes: i64,
}

Expand All @@ -935,18 +930,18 @@ impl MergeBin {
self.files.len()
}

fn add(&mut self, meta: ObjectMeta) {
self.size_bytes += meta.size as i64;
self.files.push(meta);
fn add(&mut self, add: Add) {
self.size_bytes += add.size as i64;
self.files.push(add);
}

fn iter(&self) -> impl Iterator<Item = &ObjectMeta> {
fn iter(&self) -> impl Iterator<Item = &Add> {
self.files.iter()
}
}

impl IntoIterator for MergeBin {
type Item = ObjectMeta;
type Item = Add;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
Expand All @@ -961,8 +956,7 @@ fn build_compaction_plan(
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<ObjectMeta>)> =
HashMap::new();
let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<Add>)> = HashMap::new();
for add in snapshot.get_active_add_actions_by_partitions(filters)? {
let add = add?;
metrics.total_considered_files += 1;
Expand All @@ -981,7 +975,7 @@ fn build_compaction_plan(
.entry(add.partition_values()?.hive_partition_path())
.or_insert_with(|| (partition_values, vec![]))
.1
.push(object_meta);
.push(add.add_action());
}

for (_, file) in partition_files.values_mut() {
Expand Down Expand Up @@ -1075,13 +1069,11 @@ fn build_zorder_plan(
.map(|(k, v)| (k.to_string(), v))
.collect::<IndexMap<_, _>>();
metrics.total_considered_files += 1;
let object_meta = ObjectMeta::try_from(&add)?;

partition_files
.entry(partition_values.hive_partition_path())
.or_insert_with(|| (partition_values, MergeBin::new()))
.1
.add(object_meta);
.add(add.add_action());
debug!("partition_files inside the zorder plan: {partition_files:?}");
}

Expand Down
12 changes: 10 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
//! └───────────────────────────────┘
//!</pre>
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -766,7 +765,7 @@ impl PostCommit {
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState { snapshot };
let mut state = DeltaTableState { snapshot };

let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
cleanup_logs
Expand Down Expand Up @@ -809,6 +808,15 @@ impl PostCommit {
Some(post_commit_operation_id),
)
.await? as u64;
if num_log_files_cleaned_up > 0 {
state = DeltaTableState::try_new(
&state.snapshot().table_root(),
self.log_store.object_store(None),
state.load_config().clone(),
Some(self.version),
)
.await?;
}
}

// Run arbitrary after_post_commit_hook code
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ mod tests {
use arrow::record_batch::RecordBatch;
use arrow_schema::DataType;
use datafusion::assert_batches_sorted_eq;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use serde_json::json;
use std::sync::Arc;
Expand Down
Loading

0 comments on commit dbcc3fb

Please sign in to comment.