Skip to content

Commit

Permalink
add pyspark stream integration test with new schema evolution id beha…
Browse files Browse the repository at this point in the history
…vior

Signed-off-by: Liam Murphy <[email protected]>
  • Loading branch information
liamphmurphy committed Feb 27, 2025
1 parent 0c90760 commit e715999
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
77 changes: 76 additions & 1 deletion python/tests/pyspark_integration/test_write_to_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaProtocolError

from .utils import assert_spark_read_equal, get_spark
from .utils import assert_spark_read_equal, get_spark, run_stream_with_checkpoint

try:
import delta
Expand Down Expand Up @@ -198,3 +198,78 @@ def test_spark_read_repair_run(tmp_path):
)

assert latest_operation_metrics["operationMetrics"] is not None

@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_stream_schema_evolution(tmp_path: pathlib.Path):
"""https://github.com/delta-io/delta-rs/issues/3274"""
"""
This test ensures that Spark can still read from tables following
a schema evolution write, since old behavior was to generate a new table ID
between schema evolution runs, which caused Spark to error thinking the table had changed.
"""

data_first_write = pa.array(
[
{"name": "Alice", "age": 30, "details": {"email": "[email protected]"}},
{"name": "Bob", "age": 25, "details": {"email": "[email protected]"}},
]
)

data_second_write = pa.array(
[
{
"name": "Charlie",
"age": 35,
"details": {"address": "123 Main St", "email": "[email protected]"},
},
{
"name": "Diana",
"age": 28,
"details": {"address": "456 Elm St", "email": "[email protected]"},
},
]
)

schema_first_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
("details", pa.struct([("email", pa.string())])),
]
)

schema_second_write = pa.schema(
[
("name", pa.string()),
("age", pa.int64()),
(
"details",
pa.struct(
[
("address", pa.string()),
("email", pa.string()),
]
),
),
]
)
table_first_write = pa.Table.from_pylist(
data_first_write, schema=schema_first_write
)
table_second_write = pa.Table.from_pylist(
data_second_write, schema=schema_second_write
)

write_deltalake(tmp_path, table_first_write, mode="append", engine="rust")

run_stream_with_checkpoint(tmp_path.as_posix())

write_deltalake(
tmp_path, table_second_write, mode="append", engine="rust", schema_mode="merge"
)

run_stream_with_checkpoint(tmp_path.as_posix())

# For this test we don't care about the data, we'll just let any
# exceptions trickle up to report as a failure
17 changes: 17 additions & 0 deletions python/tests/pyspark_integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,20 @@ def assert_spark_read_equal(
.sort_values(sort_by, ignore_index=True)
.drop(incompatible_types, axis="columns", errors="ignore"),
)


def run_stream_with_checkpoint(source_table: str):
spark = get_spark()

stream_path = source_table + "/stream"
checkpoint_path = stream_path + "streaming_checkpoints/"

streaming_df = spark.readStream.format("delta").load(source_table)
query = streaming_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path) \
.option("mergeSchema", "true") \
.start(stream_path)
query.processAllAvailable()
query.stop()

0 comments on commit e715999

Please sign in to comment.