Skip to content

Commit

Permalink
Merge branch 'main' into alamb/unpin_home
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Feb 28, 2025
2 parents 3be70b2 + f495ee6 commit 86677fb
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 6 deletions.
6 changes: 6 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ impl Metadata {
self
}

/// set the table ID in the metadata action
pub fn with_table_id(mut self, id: String) -> Self {
self.id = id;
self
}

/// get the table schema
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
Expand Down
13 changes: 8 additions & 5 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,14 @@ impl std::future::IntoFuture for WriteBuilder {
.apply_column_metadata_to_protocol(&schema_struct)?
.move_table_properties_into_features(&configuration);

let schema_action = Action::Metadata(Metadata::try_new(
schema_struct,
partition_columns.clone(),
configuration,
)?);
let mut metadata =
Metadata::try_new(schema_struct, partition_columns.clone(), configuration)?;
let existing_metadata_id = snapshot.metadata().id.clone();

if !existing_metadata_id.is_empty() {
metadata = metadata.with_table_id(existing_metadata_id);
}
let schema_action = Action::Metadata(metadata);
actions.push(schema_action);
if current_protocol != &new_protocol {
actions.push(new_protocol.into())
Expand Down
78 changes: 77 additions & 1 deletion python/tests/pyspark_integration/test_write_to_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaProtocolError

from .utils import assert_spark_read_equal, get_spark
from .utils import assert_spark_read_equal, get_spark, run_stream_with_checkpoint

try:
import delta
Expand Down Expand Up @@ -198,3 +198,79 @@ def test_spark_read_repair_run(tmp_path):
)

assert latest_operation_metrics["operationMetrics"] is not None


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_stream_schema_evolution(tmp_path: pathlib.Path):
"""https://github.com/delta-io/delta-rs/issues/3274"""
"""
This test ensures that Spark can still read from tables following
a schema evolution write, since old behavior was to generate a new table ID
between schema evolution runs, which caused Spark to error thinking the table had changed.
"""

data_first_write = pa.array(
[
{"name": "Alice", "age": 30, "details": {"email": "[email protected]"}},
{"name": "Bob", "age": 25, "details": {"email": "[email protected]"}},
]
)

data_second_write = pa.array(
[
{
"name": "Charlie",
"age": 35,
"details": {"address": "123 Main St", "email": "[email protected]"},
},
{
"name": "Diana",
"age": 28,
"details": {"address": "456 Elm St", "email": "[email protected]"},
},
]
)

schema_first_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
("details", pa.struct([("email", pa.string())])),
]
)

schema_second_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
(
"details",
pa.struct(
[
("address", pa.string()),
("email", pa.string()),
]
),
),
]
)
table_first_write = pa.Table.from_pylist(
data_first_write, schema=schema_first_write
)
table_second_write = pa.Table.from_pylist(
data_second_write, schema=schema_second_write
)

write_deltalake(tmp_path, table_first_write, mode="append", engine="rust")

run_stream_with_checkpoint(tmp_path.as_posix())

write_deltalake(
tmp_path, table_second_write, mode="append", engine="rust", schema_mode="merge"
)

run_stream_with_checkpoint(tmp_path.as_posix())

# For this test we don't care about the data, we'll just let any
# exceptions trickle up to report as a failure
18 changes: 18 additions & 0 deletions python/tests/pyspark_integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,21 @@ def assert_spark_read_equal(
.sort_values(sort_by, ignore_index=True)
.drop(incompatible_types, axis="columns", errors="ignore"),
)


def run_stream_with_checkpoint(source_table: str):
spark = get_spark()

stream_path = source_table + "/stream"
checkpoint_path = stream_path + "streaming_checkpoints/"

streaming_df = spark.readStream.format("delta").load(source_table)
query = (
streaming_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.start(stream_path)
)
query.processAllAvailable()
query.stop()
68 changes: 68 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2053,3 +2053,71 @@ def test_write_type_coercion_predicate(tmp_path: pathlib.Path):
mode="overwrite",
delta_write_options=dict(engine="rust", predicate="C = 'a'"),
)


def test_write_schema_evolved_same_metadata_id(tmp_path):
"""https://github.com/delta-io/delta-rs/issues/3274"""

data_first_write = pa.array(
[
{"name": "Alice", "age": 30, "details": {"email": "[email protected]"}},
{"name": "Bob", "age": 25, "details": {"email": "[email protected]"}},
]
)

data_second_write = pa.array(
[
{
"name": "Charlie",
"age": 35,
"details": {"address": "123 Main St", "email": "[email protected]"},
},
{
"name": "Diana",
"age": 28,
"details": {"address": "456 Elm St", "email": "[email protected]"},
},
]
)

schema_first_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
("details", pa.struct([("email", pa.string())])),
]
)

schema_second_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
(
"details",
pa.struct(
[
("address", pa.string()),
("email", pa.string()),
]
),
),
]
)
table_first_write = pa.Table.from_pylist(
data_first_write, schema=schema_first_write
)
table_second_write = pa.Table.from_pylist(
data_second_write, schema=schema_second_write
)

write_deltalake(tmp_path, table_first_write, mode="append", engine="rust")

first_metadata_id = DeltaTable(tmp_path).metadata().id

write_deltalake(
tmp_path, table_second_write, mode="append", engine="rust", schema_mode="merge"
)

second_metadata_id = DeltaTable(tmp_path).metadata().id

assert first_metadata_id == second_metadata_id

0 comments on commit 86677fb

Please sign in to comment.