diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index f49ab2ce75..f09cd66d80 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -111,6 +111,12 @@ impl Metadata { self } + /// set the table ID in the metadata action + pub fn with_table_id(mut self, id: String) -> Self { + self.id = id; + self + } + /// get the table schema pub fn schema(&self) -> DeltaResult { Ok(serde_json::from_str(&self.schema_string)?) diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 49348415b4..e1096cb7bd 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -555,11 +555,14 @@ impl std::future::IntoFuture for WriteBuilder { .apply_column_metadata_to_protocol(&schema_struct)? .move_table_properties_into_features(&configuration); - let schema_action = Action::Metadata(Metadata::try_new( - schema_struct, - partition_columns.clone(), - configuration, - )?); + let mut metadata = + Metadata::try_new(schema_struct, partition_columns.clone(), configuration)?; + let existing_metadata_id = snapshot.metadata().id.clone(); + + if !existing_metadata_id.is_empty() { + metadata = metadata.with_table_id(existing_metadata_id); + } + let schema_action = Action::Metadata(metadata); actions.push(schema_action); if current_protocol != &new_protocol { actions.push(new_protocol.into()) diff --git a/python/tests/pyspark_integration/test_write_to_pyspark.py b/python/tests/pyspark_integration/test_write_to_pyspark.py index f9e9d1824a..716103ae99 100644 --- a/python/tests/pyspark_integration/test_write_to_pyspark.py +++ b/python/tests/pyspark_integration/test_write_to_pyspark.py @@ -9,7 +9,7 @@ from deltalake import DeltaTable, write_deltalake from deltalake.exceptions import DeltaProtocolError -from .utils import assert_spark_read_equal, get_spark +from .utils import assert_spark_read_equal, get_spark, run_stream_with_checkpoint try: import delta @@ -198,3 +198,79 @@ def test_spark_read_repair_run(tmp_path): ) assert latest_operation_metrics["operationMetrics"] is not None + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_spark_stream_schema_evolution(tmp_path: pathlib.Path): + """https://github.com/delta-io/delta-rs/issues/3274""" + """ + This test ensures that Spark can still read from tables following + a schema evolution write, since old behavior was to generate a new table ID + between schema evolution runs, which caused Spark to error thinking the table had changed. + """ + + data_first_write = pa.array( + [ + {"name": "Alice", "age": 30, "details": {"email": "alice@example.com"}}, + {"name": "Bob", "age": 25, "details": {"email": "bob@example.com"}}, + ] + ) + + data_second_write = pa.array( + [ + { + "name": "Charlie", + "age": 35, + "details": {"address": "123 Main St", "email": "charlie@example.com"}, + }, + { + "name": "Diana", + "age": 28, + "details": {"address": "456 Elm St", "email": "diana@example.com"}, + }, + ] + ) + + schema_first_write = pa.schema( + [ + ("name", pa.string()), + ("age", pa.int64()), + ("details", pa.struct([("email", pa.string())])), + ] + ) + + schema_second_write = pa.schema( + [ + ("name", pa.string()), + ("age", pa.int64()), + ( + "details", + pa.struct( + [ + ("address", pa.string()), + ("email", pa.string()), + ] + ), + ), + ] + ) + table_first_write = pa.Table.from_pylist( + data_first_write, schema=schema_first_write + ) + table_second_write = pa.Table.from_pylist( + data_second_write, schema=schema_second_write + ) + + write_deltalake(tmp_path, table_first_write, mode="append", engine="rust") + + run_stream_with_checkpoint(tmp_path.as_posix()) + + write_deltalake( + tmp_path, table_second_write, mode="append", engine="rust", schema_mode="merge" + ) + + run_stream_with_checkpoint(tmp_path.as_posix()) + + # For this test we don't care about the data, we'll just let any + # exceptions trickle up to report as a failure diff --git a/python/tests/pyspark_integration/utils.py b/python/tests/pyspark_integration/utils.py index 5ec23317a0..592248159b 100644 --- a/python/tests/pyspark_integration/utils.py +++ b/python/tests/pyspark_integration/utils.py @@ -47,3 +47,21 @@ def assert_spark_read_equal( .sort_values(sort_by, ignore_index=True) .drop(incompatible_types, axis="columns", errors="ignore"), ) + + +def run_stream_with_checkpoint(source_table: str): + spark = get_spark() + + stream_path = source_table + "/stream" + checkpoint_path = stream_path + "streaming_checkpoints/" + + streaming_df = spark.readStream.format("delta").load(source_table) + query = ( + streaming_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", checkpoint_path) + .option("mergeSchema", "true") + .start(stream_path) + ) + query.processAllAvailable() + query.stop() diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 6d09906866..114d4b6f7c 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -2053,3 +2053,71 @@ def test_write_type_coercion_predicate(tmp_path: pathlib.Path): mode="overwrite", delta_write_options=dict(engine="rust", predicate="C = 'a'"), ) + + +def test_write_schema_evolved_same_metadata_id(tmp_path): + """https://github.com/delta-io/delta-rs/issues/3274""" + + data_first_write = pa.array( + [ + {"name": "Alice", "age": 30, "details": {"email": "alice@example.com"}}, + {"name": "Bob", "age": 25, "details": {"email": "bob@example.com"}}, + ] + ) + + data_second_write = pa.array( + [ + { + "name": "Charlie", + "age": 35, + "details": {"address": "123 Main St", "email": "charlie@example.com"}, + }, + { + "name": "Diana", + "age": 28, + "details": {"address": "456 Elm St", "email": "diana@example.com"}, + }, + ] + ) + + schema_first_write = pa.schema( + [ + ("name", pa.string()), + ("age", pa.int64()), + ("details", pa.struct([("email", pa.string())])), + ] + ) + + schema_second_write = pa.schema( + [ + ("name", pa.string()), + ("age", pa.int64()), + ( + "details", + pa.struct( + [ + ("address", pa.string()), + ("email", pa.string()), + ] + ), + ), + ] + ) + table_first_write = pa.Table.from_pylist( + data_first_write, schema=schema_first_write + ) + table_second_write = pa.Table.from_pylist( + data_second_write, schema=schema_second_write + ) + + write_deltalake(tmp_path, table_first_write, mode="append", engine="rust") + + first_metadata_id = DeltaTable(tmp_path).metadata().id + + write_deltalake( + tmp_path, table_second_write, mode="append", engine="rust", schema_mode="merge" + ) + + second_metadata_id = DeltaTable(tmp_path).metadata().id + + assert first_metadata_id == second_metadata_id