Skip to content

Commit

Permalink
Add regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper committed Dec 26, 2023
1 parent ebf1bed commit 8f149a1
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions integration/test_flow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,46 @@ def test_write_to_v3io_stream(assign_stream_teardown_test):
asyncio.run(async_test_write_to_v3io_stream(assign_stream_teardown_test))


async def async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, iteration):

controller = flow.run()
for i in range(10):
await controller.emit(i)

await asyncio.sleep(5)

try:
shard0_data = await GetShardData().get_shard_data(f"{stream_path}/0")
assert shard0_data == [b"0", b"2", b"4", b"6", b"8"] * iteration
shard1_data = await GetShardData().get_shard_data(f"{stream_path}/1")
assert shard1_data == [b"1", b"3", b"5", b"7", b"9"] * iteration
finally:
await controller.terminate()
await controller.await_termination()


# ML-5351
def test_write_to_v3io_stream_flow_reuse(assign_stream_teardown_test):
stream_path = assign_stream_teardown_test
flow = build_flow(
[
AsyncEmitSource(),
Map(lambda x: str(x)),
StreamTarget(
V3ioDriver(),
stream_path,
sharding_func=lambda event: int(event.body),
batch_size=8,
shards=2,
full_event=False,
),
]
)

asyncio.run(async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, 1))
asyncio.run(async_test_write_to_v3io_stream_flow_reuse(stream_path, flow, 2))


async def async_test_write_to_v3io_stream_full_event_readback(
setup_stream_teardown_test,
):
Expand Down

0 comments on commit 8f149a1

Please sign in to comment.